| import time |
| import cv2 |
| import torch |
| import numpy as np |
| from pathlib import Path |
| from numpy import ndarray |
| from pydantic import BaseModel |
| from ultralytics import YOLO |
| import os |
|
|
| from typing import Iterable, Generator, List, TypeVar, Tuple, Sequence, Any, Dict, Optional |
| from collections import deque, OrderedDict, defaultdict |
| import threading |
| from itertools import combinations |
| from concurrent.futures import ThreadPoolExecutor |
| import yaml |
| from cv2 import ( |
| bitwise_and, |
| findHomography, |
| warpPerspective, |
| cvtColor, |
| COLOR_BGR2GRAY, |
| threshold, |
| THRESH_BINARY, |
| getStructuringElement, |
| MORPH_RECT, |
| MORPH_TOPHAT, |
| GaussianBlur, |
| morphologyEx, |
| Canny, |
| connectedComponents, |
| perspectiveTransform, |
| RETR_EXTERNAL, |
| CHAIN_APPROX_SIMPLE, |
| findContours, |
| boundingRect, |
| dilate, |
| imread, |
| countNonZero |
| ) |
| import gc |
|
|
| os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" |
| class BoundingBox(BaseModel): |
| x1: int |
| y1: int |
| x2: int |
| y2: int |
| cls_id: int |
| conf: float |
| track_id: int | None = None |
|
|
|
|
| class TVFrameResult(BaseModel): |
| frame_id: int |
| boxes: list[BoundingBox] |
| keypoints: list[tuple[int, int]] |
|
|
| V = TypeVar("V") |
| kp_threshold = 0.3 |
|
|
| def create_batches(sequence: Iterable[V], batch_size: int) -> Generator[List[V], None, None]: |
| batch_size = max(batch_size, 1) |
| current_batch = [] |
| for element in sequence: |
| if len(current_batch) == batch_size: |
| yield current_batch |
| current_batch = [] |
| current_batch.append(element) |
| if current_batch: |
| yield current_batch |
|
|
| from torch import nn |
| from torch.nn import functional as F |
| from sklearn.cluster import KMeans |
| from PIL import Image |
| from collections import defaultdict |
|
|
| _OSNET_MODEL = None |
| team_classifier_path = None |
|
|
| BALL_ID = 0 |
| GK_ID = 1 |
| PLAYER_ID = 2 |
| REF_ID = 3 |
| TEAM_1_ID = 6 |
| TEAM_2_ID = 7 |
|
|
| pretrained_urls = { |
| 'osnet_x1_0': |
| 'https://drive.google.com/uc?id=1LaG1EJpHrxdAxKnSCJ_i0u-nbxSAeiFY', |
| } |
|
|
| class ConvLayer(nn.Module): |
| """Convolution layer (conv + bn + relu).""" |
|
|
| def __init__( |
| self, |
| in_channels, |
| out_channels, |
| kernel_size, |
| stride=1, |
| padding=0, |
| groups=1, |
| IN=False |
| ): |
| super(ConvLayer, self).__init__() |
| self.conv = nn.Conv2d( |
| in_channels, |
| out_channels, |
| kernel_size, |
| stride=stride, |
| padding=padding, |
| bias=False, |
| groups=groups |
| ) |
| if IN: |
| self.bn = nn.InstanceNorm2d(out_channels, affine=True) |
| else: |
| self.bn = nn.BatchNorm2d(out_channels) |
| self.relu = nn.ReLU(inplace=True) |
|
|
| def forward(self, x): |
| x = self.conv(x) |
| x = self.bn(x) |
| x = self.relu(x) |
| return x |
|
|
|
|
| class Conv1x1(nn.Module): |
| """1x1 convolution + bn + relu.""" |
|
|
| def __init__(self, in_channels, out_channels, stride=1, groups=1): |
| super(Conv1x1, self).__init__() |
| self.conv = nn.Conv2d( |
| in_channels, |
| out_channels, |
| 1, |
| stride=stride, |
| padding=0, |
| bias=False, |
| groups=groups |
| ) |
| self.bn = nn.BatchNorm2d(out_channels) |
| self.relu = nn.ReLU(inplace=True) |
|
|
| def forward(self, x): |
| x = self.conv(x) |
| x = self.bn(x) |
| x = self.relu(x) |
| return x |
|
|
|
|
| class Conv1x1Linear(nn.Module): |
| """1x1 convolution + bn (w/o non-linearity).""" |
|
|
| def __init__(self, in_channels, out_channels, stride=1): |
| super(Conv1x1Linear, self).__init__() |
| self.conv = nn.Conv2d( |
| in_channels, out_channels, 1, stride=stride, padding=0, bias=False |
| ) |
| self.bn = nn.BatchNorm2d(out_channels) |
|
|
| def forward(self, x): |
| x = self.conv(x) |
| x = self.bn(x) |
| return x |
|
|
|
|
| class Conv3x3(nn.Module): |
| """3x3 convolution + bn + relu.""" |
|
|
| def __init__(self, in_channels, out_channels, stride=1, groups=1): |
| super(Conv3x3, self).__init__() |
| self.conv = nn.Conv2d( |
| in_channels, |
| out_channels, |
| 3, |
| stride=stride, |
| padding=1, |
| bias=False, |
| groups=groups |
| ) |
| self.bn = nn.BatchNorm2d(out_channels) |
| self.relu = nn.ReLU(inplace=True) |
|
|
| def forward(self, x): |
| x = self.conv(x) |
| x = self.bn(x) |
| x = self.relu(x) |
| return x |
|
|
|
|
| class LightConv3x3(nn.Module): |
| """Lightweight 3x3 convolution. |
| |
| 1x1 (linear) + dw 3x3 (nonlinear). |
| """ |
|
|
| def __init__(self, in_channels, out_channels): |
| super(LightConv3x3, self).__init__() |
| self.conv1 = nn.Conv2d( |
| in_channels, out_channels, 1, stride=1, padding=0, bias=False |
| ) |
| self.conv2 = nn.Conv2d( |
| out_channels, |
| out_channels, |
| 3, |
| stride=1, |
| padding=1, |
| bias=False, |
| groups=out_channels |
| ) |
| self.bn = nn.BatchNorm2d(out_channels) |
| self.relu = nn.ReLU(inplace=True) |
|
|
| def forward(self, x): |
| x = self.conv1(x) |
| x = self.conv2(x) |
| x = self.bn(x) |
| x = self.relu(x) |
| return x |
|
|
|
|
| class ChannelGate(nn.Module): |
|
|
| def __init__( |
| self, |
| in_channels, |
| num_gates=None, |
| return_gates=False, |
| gate_activation='sigmoid', |
| reduction=16, |
| layer_norm=False |
| ): |
| super(ChannelGate, self).__init__() |
| if num_gates is None: |
| num_gates = in_channels |
| self.return_gates = return_gates |
| self.global_avgpool = nn.AdaptiveAvgPool2d(1) |
| self.fc1 = nn.Conv2d( |
| in_channels, |
| in_channels // reduction, |
| kernel_size=1, |
| bias=True, |
| padding=0 |
| ) |
| self.norm1 = None |
| if layer_norm: |
| self.norm1 = nn.LayerNorm((in_channels // reduction, 1, 1)) |
| self.relu = nn.ReLU(inplace=True) |
| self.fc2 = nn.Conv2d( |
| in_channels // reduction, |
| num_gates, |
| kernel_size=1, |
| bias=True, |
| padding=0 |
| ) |
| if gate_activation == 'sigmoid': |
| self.gate_activation = nn.Sigmoid() |
| elif gate_activation == 'relu': |
| self.gate_activation = nn.ReLU(inplace=True) |
| elif gate_activation == 'linear': |
| self.gate_activation = None |
| else: |
| raise RuntimeError( |
| "Unknown gate activation: {}".format(gate_activation) |
| ) |
|
|
| def forward(self, x): |
| input = x |
| x = self.global_avgpool(x) |
| x = self.fc1(x) |
| if self.norm1 is not None: |
| x = self.norm1(x) |
| x = self.relu(x) |
| x = self.fc2(x) |
| if self.gate_activation is not None: |
| x = self.gate_activation(x) |
| if self.return_gates: |
| return x |
| return input * x |
|
|
|
|
| class OSBlock(nn.Module): |
| """Omni-scale feature learning block.""" |
|
|
| def __init__( |
| self, |
| in_channels, |
| out_channels, |
| IN=False, |
| bottleneck_reduction=4, |
| **kwargs |
| ): |
| super(OSBlock, self).__init__() |
| mid_channels = out_channels // bottleneck_reduction |
| self.conv1 = Conv1x1(in_channels, mid_channels) |
| self.conv2a = LightConv3x3(mid_channels, mid_channels) |
| self.conv2b = nn.Sequential( |
| LightConv3x3(mid_channels, mid_channels), |
| LightConv3x3(mid_channels, mid_channels), |
| ) |
| self.conv2c = nn.Sequential( |
| LightConv3x3(mid_channels, mid_channels), |
| LightConv3x3(mid_channels, mid_channels), |
| LightConv3x3(mid_channels, mid_channels), |
| ) |
| self.conv2d = nn.Sequential( |
| LightConv3x3(mid_channels, mid_channels), |
| LightConv3x3(mid_channels, mid_channels), |
| LightConv3x3(mid_channels, mid_channels), |
| LightConv3x3(mid_channels, mid_channels), |
| ) |
| self.gate = ChannelGate(mid_channels) |
| self.conv3 = Conv1x1Linear(mid_channels, out_channels) |
| self.downsample = None |
| if in_channels != out_channels: |
| self.downsample = Conv1x1Linear(in_channels, out_channels) |
| self.IN = None |
| if IN: |
| self.IN = nn.InstanceNorm2d(out_channels, affine=True) |
|
|
| def forward(self, x): |
| identity = x |
| x1 = self.conv1(x) |
| x2a = self.conv2a(x1) |
| x2b = self.conv2b(x1) |
| x2c = self.conv2c(x1) |
| x2d = self.conv2d(x1) |
| x2 = self.gate(x2a) + self.gate(x2b) + self.gate(x2c) + self.gate(x2d) |
| x3 = self.conv3(x2) |
| if self.downsample is not None: |
| identity = self.downsample(identity) |
| out = x3 + identity |
| if self.IN is not None: |
| out = self.IN(out) |
| return F.relu(out) |
|
|
|
|
| class OSNet(nn.Module): |
|
|
| def __init__( |
| self, |
| num_classes, |
| blocks, |
| layers, |
| channels, |
| feature_dim=512, |
| loss='softmax', |
| IN=False, |
| **kwargs |
| ): |
| super(OSNet, self).__init__() |
| num_blocks = len(blocks) |
| assert num_blocks == len(layers) |
| assert num_blocks == len(channels) - 1 |
| self.loss = loss |
| self.feature_dim = feature_dim |
|
|
| |
| self.conv1 = ConvLayer(3, channels[0], 7, stride=2, padding=3, IN=IN) |
| self.maxpool = nn.MaxPool2d(3, stride=2, padding=1) |
| self.conv2 = self._make_layer( |
| blocks[0], |
| layers[0], |
| channels[0], |
| channels[1], |
| reduce_spatial_size=True, |
| IN=IN |
| ) |
| self.conv3 = self._make_layer( |
| blocks[1], |
| layers[1], |
| channels[1], |
| channels[2], |
| reduce_spatial_size=True |
| ) |
| self.conv4 = self._make_layer( |
| blocks[2], |
| layers[2], |
| channels[2], |
| channels[3], |
| reduce_spatial_size=False |
| ) |
| self.conv5 = Conv1x1(channels[3], channels[3]) |
| self.global_avgpool = nn.AdaptiveAvgPool2d(1) |
| |
| self.fc = self._construct_fc_layer( |
| self.feature_dim, channels[3], dropout_p=None |
| ) |
| |
| self.classifier = nn.Linear(self.feature_dim, num_classes) |
|
|
| self._init_params() |
|
|
| def _make_layer( |
| self, |
| block, |
| layer, |
| in_channels, |
| out_channels, |
| reduce_spatial_size, |
| IN=False |
| ): |
| layers = [] |
|
|
| layers.append(block(in_channels, out_channels, IN=IN)) |
| for i in range(1, layer): |
| layers.append(block(out_channels, out_channels, IN=IN)) |
|
|
| if reduce_spatial_size: |
| layers.append( |
| nn.Sequential( |
| Conv1x1(out_channels, out_channels), |
| nn.AvgPool2d(2, stride=2) |
| ) |
| ) |
|
|
| return nn.Sequential(*layers) |
|
|
| def _construct_fc_layer(self, fc_dims, input_dim, dropout_p=None): |
| if fc_dims is None or fc_dims < 0: |
| self.feature_dim = input_dim |
| return None |
|
|
| if isinstance(fc_dims, int): |
| fc_dims = [fc_dims] |
|
|
| layers = [] |
| for dim in fc_dims: |
| layers.append(nn.Linear(input_dim, dim)) |
| layers.append(nn.BatchNorm1d(dim)) |
| layers.append(nn.ReLU(inplace=True)) |
| if dropout_p is not None: |
| layers.append(nn.Dropout(p=dropout_p)) |
| input_dim = dim |
|
|
| self.feature_dim = fc_dims[-1] |
|
|
| return nn.Sequential(*layers) |
|
|
| def _init_params(self): |
| for m in self.modules(): |
| if isinstance(m, nn.Conv2d): |
| nn.init.kaiming_normal_( |
| m.weight, mode='fan_out', nonlinearity='relu' |
| ) |
| if m.bias is not None: |
| nn.init.constant_(m.bias, 0) |
|
|
| elif isinstance(m, nn.BatchNorm2d): |
| nn.init.constant_(m.weight, 1) |
| nn.init.constant_(m.bias, 0) |
|
|
| elif isinstance(m, nn.BatchNorm1d): |
| nn.init.constant_(m.weight, 1) |
| nn.init.constant_(m.bias, 0) |
|
|
| elif isinstance(m, nn.Linear): |
| nn.init.normal_(m.weight, 0, 0.01) |
| if m.bias is not None: |
| nn.init.constant_(m.bias, 0) |
|
|
| def featuremaps(self, x): |
| x = self.conv1(x) |
| x = self.maxpool(x) |
| x = self.conv2(x) |
| x = self.conv3(x) |
| x = self.conv4(x) |
| x = self.conv5(x) |
| return x |
|
|
| def forward(self, x, return_featuremaps=False): |
| x = self.featuremaps(x) |
| if return_featuremaps: |
| return x |
| v = self.global_avgpool(x) |
| v = v.view(v.size(0), -1) |
| if self.fc is not None: |
| v = self.fc(v) |
| if not self.training: |
| return v |
| y = self.classifier(v) |
| if self.loss == 'softmax': |
| return y |
| elif self.loss == 'triplet': |
| return y, v |
| else: |
| raise KeyError("Unsupported loss: {}".format(self.loss)) |
|
|
|
|
| def init_pretrained_weights(model, key=''): |
| import os |
| import errno |
| import gdown |
| from collections import OrderedDict |
|
|
| def _get_torch_home(): |
| ENV_TORCH_HOME = 'TORCH_HOME' |
| ENV_XDG_CACHE_HOME = 'XDG_CACHE_HOME' |
| DEFAULT_CACHE_DIR = '~/.cache' |
| torch_home = os.path.expanduser( |
| os.getenv( |
| ENV_TORCH_HOME, |
| os.path.join( |
| os.getenv(ENV_XDG_CACHE_HOME, DEFAULT_CACHE_DIR), 'torch' |
| ) |
| ) |
| ) |
| return torch_home |
|
|
| torch_home = _get_torch_home() |
| model_dir = os.path.join(torch_home, 'checkpoints') |
| try: |
| os.makedirs(model_dir) |
| except OSError as e: |
| if e.errno == errno.EEXIST: |
| |
| pass |
| else: |
| |
| raise |
| filename = key + '_imagenet.pth' |
| cached_file = os.path.join(model_dir, filename) |
|
|
| if not os.path.exists(cached_file): |
| gdown.download(pretrained_urls[key], cached_file, quiet=False) |
|
|
| state_dict = torch.load(cached_file) |
| model_dict = model.state_dict() |
| new_state_dict = OrderedDict() |
| matched_layers, discarded_layers = [], [] |
|
|
| for k, v in state_dict.items(): |
| if k.startswith('module.'): |
| k = k[7:] |
|
|
| if k in model_dict and model_dict[k].size() == v.size(): |
| new_state_dict[k] = v |
| matched_layers.append(k) |
| else: |
| discarded_layers.append(k) |
|
|
| model_dict.update(new_state_dict) |
| model.load_state_dict(model_dict) |
|
|
| if len(matched_layers) == 0: |
| print( |
| 'The pretrained weights from "{}" cannot be loaded, ' |
| 'please check the key names manually ' |
| '(** ignored and continue **)'.format(cached_file) |
| ) |
| else: |
| print( |
| 'Successfully loaded imagenet pretrained weights from "{}"'. |
| format(cached_file) |
| ) |
| if len(discarded_layers) > 0: |
| print( |
| '** The following layers are discarded ' |
| 'due to unmatched keys or layer size: {}'. |
| format(discarded_layers) |
| ) |
|
|
|
|
| def osnet_x1_0(num_classes=1000, pretrained=True, loss='softmax', **kwargs): |
| |
| model = OSNet( |
| num_classes, |
| blocks=[OSBlock, OSBlock, OSBlock], |
| layers=[2, 2, 2], |
| channels=[64, 256, 384, 512], |
| loss=loss, |
| **kwargs |
| ) |
| |
| |
| return model |
|
|
| from typing import Generator, Iterable |
| import torchvision.transforms as T |
| from collections import OrderedDict |
| import os.path as osp |
|
|
| def load_checkpoint(fpath): |
| fpath = osp.abspath(osp.expanduser(fpath)) |
| map_location = None if torch.cuda.is_available() else 'cpu' |
| |
| checkpoint = torch.load(fpath, map_location=map_location, weights_only=False) |
| return checkpoint |
|
|
| def load_pretrained_weights(model, weight_path): |
| checkpoint = load_checkpoint(weight_path) |
| if 'state_dict' in checkpoint: |
| state_dict = checkpoint['state_dict'] |
| else: |
| state_dict = checkpoint |
| model_dict = model.state_dict() |
| new_state_dict = OrderedDict() |
| matched_layers, discarded_layers = ([], []) |
| for k, v in state_dict.items(): |
| if k.startswith('module.'): |
| k = k[7:] |
| if k in model_dict and model_dict[k].size() == v.size(): |
| new_state_dict[k] = v |
| matched_layers.append(k) |
| else: |
| discarded_layers.append(k) |
| model_dict.update(new_state_dict) |
| model.load_state_dict(model_dict) |
|
|
| def load_osnet(device="cuda", weight_path=None): |
| """Build osnet_x1_0 and load weights from model.pth.tar-100 via load_pretrained_weights.""" |
| model = osnet_x1_0(num_classes=1, loss='softmax', pretrained=False, use_gpu=device == 'cuda') |
| |
| |
| weight_path = Path(weight_path) |
| if weight_path.exists(): |
| load_pretrained_weights(model, str(weight_path)) |
| model.eval() |
| model.to(device) |
| return model |
|
|
| def filter_player_boxes( |
| boxes: List[BoundingBox], |
| min_area: int = 1500 |
| ) -> List[BoundingBox]: |
|
|
| players = [] |
| for b in boxes: |
| if b.cls_id != 2: |
| continue |
| |
| |
| |
|
|
| players.append(b) |
| |
| return players |
|
|
| |
| OSNET_IMAGE_SIZE = (64, 32) |
| OSNET_PREPROCESS = T.Compose([ |
| T.Resize(OSNET_IMAGE_SIZE), |
| T.ToTensor(), |
| T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), |
| ]) |
|
|
| def crop_upper_body(frame: np.ndarray, box: BoundingBox) -> np.ndarray: |
| |
| |
|
|
| return frame[ |
| max(0, box.y1):max(0, box.y2), |
| max(0, box.x1):max(0, box.x2) |
| ] |
|
|
| def preprocess_osnet(crop: np.ndarray) -> torch.Tensor: |
| """BGR crop -> RGB PIL -> Resize, ToTensor, ImageNet Normalize (same as team_cluster).""" |
| rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB) |
| pil = Image.fromarray(rgb) |
| return OSNET_PREPROCESS(pil) |
|
|
| @torch.no_grad() |
| def extract_osnet_embeddings( |
| frames: List[np.ndarray], |
| |
| batch_boxes: dict[int, List[BoundingBox]], |
| device="cuda", |
| batch_size=4 |
| ) -> Tuple[np.ndarray, List[BoundingBox]]: |
|
|
| crops = [] |
| meta = [] |
| for frame, frame_index, boxes in zip(frames, batch_boxes.keys(), batch_boxes.values()): |
| players = filter_player_boxes(boxes) |
| |
| for box in players: |
| crop = crop_upper_body(frame, box) |
| if crop.size == 0: |
| continue |
|
|
| crops.append(preprocess_osnet(crop)) |
| meta.append(box) |
| |
| if not crops: |
| return None, None |
|
|
| all_embeddings = [] |
|
|
| with torch.no_grad(): |
| for start in range(0, len(crops), batch_size): |
| end = start + batch_size |
| batch = torch.stack(crops[start:end]).float().to(device) |
| embeddings_chunk = _OSNET_MODEL(batch) |
| all_embeddings.append(embeddings_chunk.cpu()) |
| del batch, embeddings_chunk |
|
|
| embeddings = torch.cat(all_embeddings, dim=0).numpy() |
| |
|
|
| return embeddings, meta |
|
|
| def aggregate_by_track( |
| embeddings: np.ndarray, |
| meta: List[BoundingBox] |
| ): |
| track_map = defaultdict(list) |
| box_map = {} |
|
|
| |
| for emb, box in zip(embeddings, meta): |
| key = box.track_id if box.track_id is not None else id(box) |
| track_map[key].append(emb) |
| box_map[key] = box |
|
|
| agg_embeddings = [] |
| agg_boxes = [] |
|
|
| for key, embs in track_map.items(): |
| mean_emb = np.mean(embs, axis=0) |
| mean_emb /= np.linalg.norm(mean_emb) |
|
|
| agg_embeddings.append(mean_emb) |
| agg_boxes.append(box_map[key]) |
|
|
| return np.array(agg_embeddings), agg_boxes |
|
|
| def cluster_teams(embeddings: np.ndarray): |
| if len(embeddings) < 2: |
| return None |
|
|
| kmeans = KMeans(n_clusters=2, n_init = 2, random_state=42) |
| return kmeans.fit_predict(embeddings) |
|
|
| def update_team_ids( |
| boxes: List[BoundingBox], |
| labels: np.ndarray |
| ): |
| for box, label in zip(boxes, labels): |
| box.cls_id = TEAM_1_ID if label == 0 else TEAM_2_ID |
|
|
| def classify_teams_batch( |
| frames: List[np.ndarray], |
| |
| batch_boxes: dict[int, List[BoundingBox]], |
| batch_size, |
| device="cuda" |
| ): |
| |
| embeddings, meta = extract_osnet_embeddings( |
| frames, batch_boxes, device, batch_size |
| ) |
| if embeddings is None: |
| return |
| embeddings, agg_boxes = aggregate_by_track(embeddings, meta) |
| n = len(embeddings) |
| if n == 0: |
| return |
| if n == 1: |
| agg_boxes[0].cls_id = TEAM_1_ID |
| return |
|
|
| kmeans = KMeans(n_clusters=2, n_init=2, random_state=42) |
| kmeans.fit(embeddings) |
| centroids = kmeans.cluster_centers_ |
| |
| |
| |
| c0, c1 = centroids[0], centroids[1] |
| norm_0 = np.linalg.norm(c0) |
| norm_1 = np.linalg.norm(c1) |
| |
| similarity = np.dot(c0, c1) / (norm_0 * norm_1 + 1e-12) |
| distance = np.linalg.norm(c0 - c1) |
| square_error = np.sum((c0 - c1) ** 2) |
| |
| if similarity > 0.95: |
| |
| for b in agg_boxes: |
| b.cls_id = TEAM_1_ID |
| |
| return |
| |
| if norm_0 <= norm_1: |
| kmeans.labels_ = 1 - kmeans.labels_ |
| update_team_ids(agg_boxes, kmeans.labels_) |
|
|
| def get_cls_net(config, pretrained='', **kwargs): |
| """Create keypoint detection model with softmax activation""" |
| |
|
|
| def conv3x3(in_planes, out_planes, stride=1): |
| """3x3 convolution with padding""" |
| return nn.Conv2d(in_planes, out_planes, kernel_size=3, |
| stride=stride, padding=1, bias=False) |
| |
| class BasicBlock(nn.Module): |
| expansion = 1 |
|
|
| def __init__(self, inplanes, planes, stride=1, downsample=None): |
| super(BasicBlock, self).__init__() |
| self.conv1 = conv3x3(inplanes, planes, stride) |
| self.bn1 = BatchNorm2d(planes, momentum=BN_MOMENTUM) |
| self.relu = nn.ReLU(inplace=True) |
| self.conv2 = conv3x3(planes, planes) |
| self.bn2 = BatchNorm2d(planes, momentum=BN_MOMENTUM) |
| self.downsample = downsample |
| self.stride = stride |
|
|
| def forward(self, x): |
| residual = x |
|
|
| out = self.conv1(x) |
| out = self.bn1(out) |
| out = self.relu(out) |
|
|
| out = self.conv2(out) |
| out = self.bn2(out) |
|
|
| if self.downsample is not None: |
| residual = self.downsample(x) |
|
|
| out += residual |
| out = self.relu(out) |
|
|
| return out |
| |
| class Bottleneck(nn.Module): |
| expansion = 4 |
|
|
| def __init__(self, inplanes, planes, stride=1, downsample=None): |
| super(Bottleneck, self).__init__() |
| self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False) |
| self.bn1 = BatchNorm2d(planes, momentum=BN_MOMENTUM) |
| self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, |
| padding=1, bias=False) |
| self.bn2 = BatchNorm2d(planes, momentum=BN_MOMENTUM) |
| self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, |
| bias=False) |
| self.bn3 = BatchNorm2d(planes * self.expansion, |
| momentum=BN_MOMENTUM) |
| self.relu = nn.ReLU(inplace=True) |
| self.downsample = downsample |
| self.stride = stride |
|
|
| def forward(self, x): |
| residual = x |
|
|
| out = self.conv1(x) |
| out = self.bn1(out) |
| out = self.relu(out) |
|
|
| out = self.conv2(out) |
| out = self.bn2(out) |
| out = self.relu(out) |
|
|
| out = self.conv3(out) |
| out = self.bn3(out) |
|
|
| if self.downsample is not None: |
| residual = self.downsample(x) |
|
|
| out += residual |
| out = self.relu(out) |
|
|
| return out |
|
|
| BatchNorm2d = nn.BatchNorm2d |
| BN_MOMENTUM = 0.1 |
| blocks_dict = { |
| 'BASIC': BasicBlock, |
| 'BOTTLENECK': Bottleneck |
| } |
| class HighResolutionModule(nn.Module): |
| def __init__(self, num_branches, blocks, num_blocks, num_inchannels, |
| num_channels, fuse_method, multi_scale_output=True): |
| super(HighResolutionModule, self).__init__() |
| self._check_branches( |
| num_branches, blocks, num_blocks, num_inchannels, num_channels) |
|
|
| self.num_inchannels = num_inchannels |
| self.fuse_method = fuse_method |
| self.num_branches = num_branches |
|
|
| self.multi_scale_output = multi_scale_output |
|
|
| self.branches = self._make_branches( |
| num_branches, blocks, num_blocks, num_channels) |
| self.fuse_layers = self._make_fuse_layers() |
| self.relu = nn.ReLU(inplace=True) |
|
|
| def _check_branches(self, num_branches, blocks, num_blocks, |
| num_inchannels, num_channels): |
| if num_branches != len(num_blocks): |
| error_msg = 'NUM_BRANCHES({}) <> NUM_BLOCKS({})'.format( |
| num_branches, len(num_blocks)) |
| raise ValueError(error_msg) |
|
|
| if num_branches != len(num_channels): |
| error_msg = 'NUM_BRANCHES({}) <> NUM_CHANNELS({})'.format( |
| num_branches, len(num_channels)) |
| raise ValueError(error_msg) |
|
|
| if num_branches != len(num_inchannels): |
| error_msg = 'NUM_BRANCHES({}) <> NUM_INCHANNELS({})'.format( |
| num_branches, len(num_inchannels)) |
| raise ValueError(error_msg) |
|
|
| def _make_one_branch(self, branch_index, block, num_blocks, num_channels, |
| stride=1): |
| downsample = None |
| if stride != 1 or \ |
| self.num_inchannels[branch_index] != num_channels[branch_index] * block.expansion: |
| downsample = nn.Sequential( |
| nn.Conv2d(self.num_inchannels[branch_index], |
| num_channels[branch_index] * block.expansion, |
| kernel_size=1, stride=stride, bias=False), |
| BatchNorm2d(num_channels[branch_index] * block.expansion, |
| momentum=BN_MOMENTUM), |
| ) |
|
|
| layers = [] |
| layers.append(block(self.num_inchannels[branch_index], |
| num_channels[branch_index], stride, downsample)) |
| self.num_inchannels[branch_index] = \ |
| num_channels[branch_index] * block.expansion |
| for i in range(1, num_blocks[branch_index]): |
| layers.append(block(self.num_inchannels[branch_index], |
| num_channels[branch_index])) |
|
|
| return nn.Sequential(*layers) |
|
|
| def _make_branches(self, num_branches, block, num_blocks, num_channels): |
| branches = [] |
|
|
| for i in range(num_branches): |
| branches.append( |
| self._make_one_branch(i, block, num_blocks, num_channels)) |
|
|
| return nn.ModuleList(branches) |
|
|
| def _make_fuse_layers(self): |
| if self.num_branches == 1: |
| return None |
|
|
| num_branches = self.num_branches |
| num_inchannels = self.num_inchannels |
| fuse_layers = [] |
| for i in range(num_branches if self.multi_scale_output else 1): |
| fuse_layer = [] |
| for j in range(num_branches): |
| if j > i: |
| fuse_layer.append(nn.Sequential( |
| nn.Conv2d(num_inchannels[j], |
| num_inchannels[i], |
| 1, |
| 1, |
| 0, |
| bias=False), |
| BatchNorm2d(num_inchannels[i], momentum=BN_MOMENTUM))) |
| |
| elif j == i: |
| fuse_layer.append(None) |
| else: |
| conv3x3s = [] |
| for k in range(i - j): |
| if k == i - j - 1: |
| num_outchannels_conv3x3 = num_inchannels[i] |
| conv3x3s.append(nn.Sequential( |
| nn.Conv2d(num_inchannels[j], |
| num_outchannels_conv3x3, |
| 3, 2, 1, bias=False), |
| BatchNorm2d(num_outchannels_conv3x3, momentum=BN_MOMENTUM))) |
| else: |
| num_outchannels_conv3x3 = num_inchannels[j] |
| conv3x3s.append(nn.Sequential( |
| nn.Conv2d(num_inchannels[j], |
| num_outchannels_conv3x3, |
| 3, 2, 1, bias=False), |
| BatchNorm2d(num_outchannels_conv3x3, |
| momentum=BN_MOMENTUM), |
| nn.ReLU(inplace=True))) |
| fuse_layer.append(nn.Sequential(*conv3x3s)) |
| fuse_layers.append(nn.ModuleList(fuse_layer)) |
|
|
| return nn.ModuleList(fuse_layers) |
|
|
| def get_num_inchannels(self): |
| return self.num_inchannels |
|
|
| def forward(self, x): |
| if self.num_branches == 1: |
| return [self.branches[0](x[0])] |
|
|
| for i in range(self.num_branches): |
| x[i] = self.branches[i](x[i]) |
|
|
| x_fuse = [] |
| for i in range(len(self.fuse_layers)): |
| y = x[0] if i == 0 else self.fuse_layers[i][0](x[0]) |
| for j in range(1, self.num_branches): |
| if i == j: |
| y = y + x[j] |
| elif j > i: |
| y = y + F.interpolate( |
| self.fuse_layers[i][j](x[j]), |
| size=[x[i].shape[2], x[i].shape[3]], |
| mode='bilinear') |
| else: |
| y = y + self.fuse_layers[i][j](x[j]) |
| x_fuse.append(self.relu(y)) |
|
|
| return x_fuse |
|
|
| class HighResolutionNet(nn.Module): |
|
|
| def __init__(self, config, lines=False, **kwargs): |
| self.inplanes = 64 |
| self.lines = lines |
| extra = config['MODEL']['EXTRA'] |
| super(HighResolutionNet, self).__init__() |
|
|
| |
| self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=3, stride=2, padding=1, |
| bias=False) |
| self.bn1 = BatchNorm2d(self.inplanes, momentum=BN_MOMENTUM) |
| self.conv2 = nn.Conv2d(self.inplanes, self.inplanes, kernel_size=3, stride=2, padding=1, |
| bias=False) |
| self.bn2 = BatchNorm2d(self.inplanes, momentum=BN_MOMENTUM) |
| self.relu = nn.ReLU(inplace=True) |
| self.sf = nn.Softmax(dim=1) |
| self.layer1 = self._make_layer(Bottleneck, 64, 64, 4) |
|
|
| self.stage2_cfg = extra['STAGE2'] |
| num_channels = self.stage2_cfg['NUM_CHANNELS'] |
| block = blocks_dict[self.stage2_cfg['BLOCK']] |
| num_channels = [ |
| num_channels[i] * block.expansion for i in range(len(num_channels))] |
| self.transition1 = self._make_transition_layer( |
| [256], num_channels) |
| self.stage2, pre_stage_channels = self._make_stage( |
| self.stage2_cfg, num_channels) |
|
|
| self.stage3_cfg = extra['STAGE3'] |
| num_channels = self.stage3_cfg['NUM_CHANNELS'] |
| block = blocks_dict[self.stage3_cfg['BLOCK']] |
| num_channels = [ |
| num_channels[i] * block.expansion for i in range(len(num_channels))] |
| self.transition2 = self._make_transition_layer( |
| pre_stage_channels, num_channels) |
| self.stage3, pre_stage_channels = self._make_stage( |
| self.stage3_cfg, num_channels) |
|
|
| self.stage4_cfg = extra['STAGE4'] |
| num_channels = self.stage4_cfg['NUM_CHANNELS'] |
| block = blocks_dict[self.stage4_cfg['BLOCK']] |
| num_channels = [ |
| num_channels[i] * block.expansion for i in range(len(num_channels))] |
| self.transition3 = self._make_transition_layer( |
| pre_stage_channels, num_channels) |
| self.stage4, pre_stage_channels = self._make_stage( |
| self.stage4_cfg, num_channels, multi_scale_output=True) |
|
|
| self.upsample = nn.Upsample(scale_factor=2, mode='nearest') |
| final_inp_channels = sum(pre_stage_channels) + self.inplanes |
|
|
| self.head = nn.Sequential(nn.Sequential( |
| nn.Conv2d( |
| in_channels=final_inp_channels, |
| out_channels=final_inp_channels, |
| kernel_size=1), |
| BatchNorm2d(final_inp_channels, momentum=BN_MOMENTUM), |
| nn.ReLU(inplace=True), |
| nn.Conv2d( |
| in_channels=final_inp_channels, |
| out_channels=config['MODEL']['NUM_JOINTS'], |
| kernel_size=extra['FINAL_CONV_KERNEL']), |
| nn.Softmax(dim=1) if self.lines == False else nn.Sigmoid())) |
|
|
|
|
|
|
| def _make_head(self, x, x_skip): |
| x = self.upsample(x) |
| x = torch.cat([x, x_skip], dim=1) |
| x = self.head(x) |
|
|
| return x |
|
|
| def _make_transition_layer( |
| self, num_channels_pre_layer, num_channels_cur_layer): |
| num_branches_cur = len(num_channels_cur_layer) |
| num_branches_pre = len(num_channels_pre_layer) |
|
|
| transition_layers = [] |
| for i in range(num_branches_cur): |
| if i < num_branches_pre: |
| if num_channels_cur_layer[i] != num_channels_pre_layer[i]: |
| transition_layers.append(nn.Sequential( |
| nn.Conv2d(num_channels_pre_layer[i], |
| num_channels_cur_layer[i], |
| 3, |
| 1, |
| 1, |
| bias=False), |
| BatchNorm2d( |
| num_channels_cur_layer[i], momentum=BN_MOMENTUM), |
| nn.ReLU(inplace=True))) |
| else: |
| transition_layers.append(None) |
| else: |
| conv3x3s = [] |
| for j in range(i + 1 - num_branches_pre): |
| inchannels = num_channels_pre_layer[-1] |
| outchannels = num_channels_cur_layer[i] \ |
| if j == i - num_branches_pre else inchannels |
| conv3x3s.append(nn.Sequential( |
| nn.Conv2d( |
| inchannels, outchannels, 3, 2, 1, bias=False), |
| BatchNorm2d(outchannels, momentum=BN_MOMENTUM), |
| nn.ReLU(inplace=True))) |
| transition_layers.append(nn.Sequential(*conv3x3s)) |
|
|
| return nn.ModuleList(transition_layers) |
|
|
| def _make_layer(self, block, inplanes, planes, blocks, stride=1): |
| downsample = None |
| if stride != 1 or inplanes != planes * block.expansion: |
| downsample = nn.Sequential( |
| nn.Conv2d(inplanes, planes * block.expansion, |
| kernel_size=1, stride=stride, bias=False), |
| BatchNorm2d(planes * block.expansion, momentum=BN_MOMENTUM), |
| ) |
|
|
| layers = [] |
| layers.append(block(inplanes, planes, stride, downsample)) |
| inplanes = planes * block.expansion |
| for i in range(1, blocks): |
| layers.append(block(inplanes, planes)) |
|
|
| return nn.Sequential(*layers) |
|
|
| def _make_stage(self, layer_config, num_inchannels, |
| multi_scale_output=True): |
| num_modules = layer_config['NUM_MODULES'] |
| num_branches = layer_config['NUM_BRANCHES'] |
| num_blocks = layer_config['NUM_BLOCKS'] |
| num_channels = layer_config['NUM_CHANNELS'] |
| block = blocks_dict[layer_config['BLOCK']] |
| fuse_method = layer_config['FUSE_METHOD'] |
|
|
| modules = [] |
| for i in range(num_modules): |
| |
| if not multi_scale_output and i == num_modules - 1: |
| reset_multi_scale_output = False |
| else: |
| reset_multi_scale_output = True |
| modules.append( |
| HighResolutionModule(num_branches, |
| block, |
| num_blocks, |
| num_inchannels, |
| num_channels, |
| fuse_method, |
| reset_multi_scale_output) |
| ) |
| num_inchannels = modules[-1].get_num_inchannels() |
|
|
| return nn.Sequential(*modules), num_inchannels |
|
|
| def forward(self, x): |
| |
| x = self.conv1(x) |
| x_skip = x.clone() |
| x = self.bn1(x) |
| x = self.relu(x) |
| x = self.conv2(x) |
| x = self.bn2(x) |
| x = self.relu(x) |
| x = self.layer1(x) |
|
|
| x_list = [] |
| for i in range(self.stage2_cfg['NUM_BRANCHES']): |
| if self.transition1[i] is not None: |
| x_list.append(self.transition1[i](x)) |
| else: |
| x_list.append(x) |
| y_list = self.stage2(x_list) |
|
|
| x_list = [] |
| for i in range(self.stage3_cfg['NUM_BRANCHES']): |
| if self.transition2[i] is not None: |
| x_list.append(self.transition2[i](y_list[-1])) |
| else: |
| x_list.append(y_list[i]) |
| y_list = self.stage3(x_list) |
|
|
| x_list = [] |
| for i in range(self.stage4_cfg['NUM_BRANCHES']): |
| if self.transition3[i] is not None: |
| x_list.append(self.transition3[i](y_list[-1])) |
| else: |
| x_list.append(y_list[i]) |
| x = self.stage4(x_list) |
|
|
| |
| height, width = x[0].size(2), x[0].size(3) |
| x1 = F.interpolate(x[1], size=(height, width), mode='bilinear', align_corners=False) |
| x2 = F.interpolate(x[2], size=(height, width), mode='bilinear', align_corners=False) |
| x3 = F.interpolate(x[3], size=(height, width), mode='bilinear', align_corners=False) |
| x = torch.cat([x[0], x1, x2, x3], 1) |
| x = self._make_head(x, x_skip) |
|
|
| return x |
|
|
| def init_weights(self, pretrained=''): |
| for m in self.modules(): |
| if isinstance(m, nn.Conv2d): |
| if self.lines == False: |
| nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') |
| else: |
| nn.init.normal_(m.weight, std=0.001) |
| |
| |
| elif isinstance(m, nn.BatchNorm2d): |
| nn.init.constant_(m.weight, 1) |
| nn.init.constant_(m.bias, 0) |
| if pretrained != '': |
| if os.path.isfile(pretrained): |
| pretrained_dict = torch.load(pretrained) |
| model_dict = self.state_dict() |
| pretrained_dict = {k: v for k, v in pretrained_dict.items() |
| if k in model_dict.keys()} |
| model_dict.update(pretrained_dict) |
| self.load_state_dict(model_dict) |
| else: |
| sys.exit(f'Weights {pretrained} not found.') |
|
|
| model = HighResolutionNet(config, **kwargs) |
| model.init_weights(pretrained) |
| return model |
| |
| def load_kp_model(path, device): |
| config_kp_path = path / 'hrnetv2_w48.yaml' |
| cfg_kp = yaml.safe_load(open(config_kp_path, 'r')) |
|
|
| loaded_state_kp = torch.load(path / "keypoint_detect.pt", map_location=device, weights_only=False) |
| model = get_cls_net(cfg_kp) |
| model.load_state_dict(loaded_state_kp) |
| model.to(device) |
| model.eval() |
| return model |
|
|
| def preprocess_batch_fast(frames): |
| """Ultra-fast batch preprocessing using optimized tensor operations""" |
| target_size = (540, 960) |
| batch = [] |
| for i, frame in enumerate(frames): |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| img = cv2.resize(frame_rgb, (target_size[1], target_size[0])) |
| img = img.astype(np.float32) / 255.0 |
| img = np.transpose(img, (2, 0, 1)) |
| batch.append(img) |
| batch = torch.from_numpy(np.stack(batch)).float() |
|
|
| return batch |
|
|
| def extract_keypoints_from_heatmap_fast(heatmap: torch.Tensor, scale: int = 2, max_keypoints: int = 1): |
| """Ultra-fast keypoint extraction optimized for speed""" |
| batch_size, n_channels, height, width = heatmap.shape |
| |
| |
| max_pooled = F.max_pool2d(heatmap, 3, stride=1, padding=1) |
| local_maxima = (max_pooled == heatmap) |
| |
| |
| masked_heatmap = heatmap * local_maxima |
| flat_heatmap = masked_heatmap.view(batch_size, n_channels, -1) |
| scores, indices = torch.topk(flat_heatmap, max_keypoints, dim=-1, sorted=False) |
| |
| |
| y_coords = torch.div(indices, width, rounding_mode="floor") * scale |
| x_coords = (indices % width) * scale |
| |
| |
| results = torch.stack([x_coords.float(), y_coords.float(), scores], dim=-1) |
| return results |
|
|
| def process_keypoints_vectorized(kp_coords, kp_threshold, w, h, batch_size): |
| """Ultra-fast vectorized keypoint processing""" |
| batch_results = [] |
| |
| |
| kp_np = kp_coords.cpu().numpy() |
| |
| for batch_idx in range(batch_size): |
| kp_dict = {} |
| |
| valid_kps = kp_np[batch_idx, :, 0, 2] > kp_threshold |
| valid_indices = np.where(valid_kps)[0] |
| |
| for ch_idx in valid_indices: |
| x = float(kp_np[batch_idx, ch_idx, 0, 0]) / w |
| y = float(kp_np[batch_idx, ch_idx, 0, 1]) / h |
| p = float(kp_np[batch_idx, ch_idx, 0, 2]) |
| kp_dict[ch_idx + 1] = {'x': x, 'y': y, 'p': p} |
| |
| batch_results.append(kp_dict) |
| |
| return batch_results |
|
|
| def inference_batch(frames, model, kp_threshold, device, batch_size=8): |
| """Optimized batch inference for multiple frames""" |
| results = [] |
| num_frames = len(frames) |
| |
| |
| model_device = next(model.parameters()).device |
| |
| |
| for i in range(0, num_frames, batch_size): |
| current_batch_size = min(batch_size, num_frames - i) |
| batch_frames = frames[i:i + current_batch_size] |
| |
| |
| batch = preprocess_batch_fast(batch_frames) |
| b, c, h, w = batch.size() |
| |
| |
| batch = batch.to(model_device) |
|
|
| with torch.inference_mode(): |
| heatmaps = model(batch) |
|
|
| |
| kp_coords = extract_keypoints_from_heatmap_fast(heatmaps[:,:-1,:,:], scale=2, max_keypoints=1) |
| |
| |
| batch_results = process_keypoints_vectorized(kp_coords, kp_threshold, 960, 540, current_batch_size) |
| results.extend(batch_results) |
| |
| del heatmaps, kp_coords, batch, batch_results, batch_frames |
|
|
| return results |
|
|
| map_keypoints = { |
| 1: 1, 2: 14, 3: 25, 4: 2, 5: 10, 6: 18, 7: 26, 8: 3, 9: 7, 10: 23, |
| 11: 27, 20: 4, 21: 8, 22: 24, 23: 28, 24: 5, 25: 13, 26: 21, 27: 29, |
| 28: 6, 29: 17, 30: 30, 31: 11, 32: 15, 33: 19, 34: 12, 35: 16, 36: 20, |
| 45: 9, 50: 31, 52: 32, 57: 22 |
| } |
| def get_mapped_keypoints(kp_points): |
| """Apply keypoint mapping to detection results""" |
| mapped_points = {} |
| for key, value in kp_points.items(): |
| if key in map_keypoints: |
| mapped_key = map_keypoints[key] |
| mapped_points[mapped_key] = value |
| |
| |
| |
| return mapped_points |
|
|
| def process_batch_input(frames, model, kp_threshold, device='cpu', batch_size=16): |
| """Process multiple input images in batch""" |
| |
| kp_results = inference_batch(frames, model, kp_threshold, device, batch_size) |
| kp_results = [get_mapped_keypoints(kp) for kp in kp_results] |
|
|
| return kp_results |
|
|
|
|
| def convert_keypoints_to_val_format(keypoints): |
| return [tuple(int(x) for x in pair) for pair in keypoints] |
|
|
| def normalize_keypoints(keypoints_result, batch_images, n_keypoints): |
| keypoints = [] |
| if keypoints_result is not None and len(keypoints_result) > 0: |
| for frame_number_in_batch, kp_dict in enumerate(keypoints_result): |
| if frame_number_in_batch >= len(batch_images): |
| break |
| frame_keypoints: List[Tuple[int, int]] = [] |
| try: |
| height, width = batch_images[frame_number_in_batch].shape[:2] |
| if kp_dict is not None and isinstance(kp_dict, dict): |
| for idx in range(32): |
| x, y, p = 0, 0, 0 |
| kp_idx = idx + 1 |
| if kp_idx in kp_dict: |
| try: |
| kp_data = kp_dict[kp_idx] |
| if isinstance(kp_data, dict) and "x" in kp_data and "y" in kp_data: |
| x = int(kp_data["x"] * width) |
| y = int(kp_data["y"] * height) |
| except Exception as e: |
| pass |
| frame_keypoints.append((x, y)) |
| except (IndexError, ValueError, AttributeError): |
| frame_keypoints = [(0, 0)] * 32 |
| if len(frame_keypoints) < n_keypoints: |
| frame_keypoints.extend([(0, 0)] * (n_keypoints - len(frame_keypoints))) |
| else: |
| frame_keypoints = frame_keypoints[:n_keypoints] |
| keypoints.append(frame_keypoints) |
| return keypoints |
|
|
| def fix_keypoints(frame_keypoints: list[tuple[int, int]], n_keypoints: int) -> list[tuple[int, int]]: |
| |
| if len(frame_keypoints) < n_keypoints: |
| frame_keypoints += [(0, 0)] * (n_keypoints - len(frame_keypoints)) |
| elif len(frame_keypoints) > n_keypoints: |
| frame_keypoints = frame_keypoints[:n_keypoints] |
|
|
| if(frame_keypoints[2] != (0, 0) and frame_keypoints[4] != (0, 0) and frame_keypoints[3] == (0, 0)): |
| frame_keypoints[3] = frame_keypoints[4] |
| frame_keypoints[4] = (0, 0) |
|
|
| if(frame_keypoints[0] != (0, 0) and frame_keypoints[4] != (0, 0) and frame_keypoints[1] == (0, 0)): |
| frame_keypoints[1] = frame_keypoints[4] |
| frame_keypoints[4] = (0, 0) |
|
|
| if(frame_keypoints[2] != (0, 0) and frame_keypoints[3] != (0, 0) and frame_keypoints[1] == (0, 0) and frame_keypoints[3][0] > frame_keypoints[2][0]): |
| frame_keypoints[1] = frame_keypoints[3] |
| frame_keypoints[3] = (0, 0) |
|
|
| if(frame_keypoints[28] != (0, 0) and frame_keypoints[25] == (0, 0) and frame_keypoints[26] != (0, 0) and frame_keypoints[26][0] > frame_keypoints[28][0]): |
| frame_keypoints[25] = frame_keypoints[28] |
| frame_keypoints[28] = (0, 0) |
|
|
| if(frame_keypoints[24] != (0, 0) and frame_keypoints[28] != (0, 0) and frame_keypoints[25] == (0, 0)): |
| frame_keypoints[25] = frame_keypoints[28] |
| frame_keypoints[28] = (0, 0) |
|
|
| if(frame_keypoints[24] != (0, 0) and frame_keypoints[27] != (0, 0) and frame_keypoints[26] == (0, 0)): |
| frame_keypoints[26] = frame_keypoints[27] |
| frame_keypoints[27] = (0, 0) |
|
|
| if(frame_keypoints[28] != (0, 0) and frame_keypoints[23] == (0, 0) and frame_keypoints[20] != (0, 0) and frame_keypoints[20][1] > frame_keypoints[23][1]): |
| frame_keypoints[23] = frame_keypoints[20] |
| frame_keypoints[20] = (0, 0) |
|
|
| if(frame_keypoints[28] != (0, 0) and frame_keypoints[23] == (0, 0) and frame_keypoints[20] != (0, 0) and frame_keypoints[20][1] > frame_keypoints[23][1]): |
| frame_keypoints[23] = frame_keypoints[20] |
| frame_keypoints[20] = (0, 0) |
|
|
|
|
| return frame_keypoints |
|
|
| def challenge_template(path_hf_repo) -> ndarray: |
| return imread(f"{path_hf_repo}/football_pitch_template.png") |
|
|
| current_path = str(os.path.dirname(os.path.abspath(__file__))) |
| template_image = challenge_template(current_path) |
| template_image_gray = cvtColor(template_image, COLOR_BGR2GRAY) |
| _sparse_template_cache: dict[tuple[int, int], list[tuple[int, int]]] = {} |
| _shared_eval_executor: ThreadPoolExecutor | None = None |
|
|
| class MaxSizeCache(OrderedDict): |
| """ |
| Fixed-size dictionary behaving like a deque(maxlen=N). |
| Stores key–value pairs with FIFO eviction. |
| """ |
|
|
| def __init__(self, maxlen=500): |
| super().__init__() |
| self.maxlen = maxlen |
| self._lock = threading.Lock() |
|
|
| def set(self, key, value): |
| """Insert or update an item. Evicts oldest if full.""" |
| with self._lock: |
| if key in self: |
| del self[key] |
| super().__setitem__(key, value) |
|
|
| if len(self) > self.maxlen: |
| self.popitem(last=False) |
|
|
| def get(self, key, default=None): |
| """Retrieve an item without changing order.""" |
| with self._lock: |
| return super().get(key, default) |
|
|
| def exists(self, key): |
| """Check if a key exists.""" |
| with self._lock: |
| return key in self |
|
|
| def load(self, data_dict): |
| """ |
| Load initial data into cache. |
| Oldest items evicted if data exceeds maxlen. |
| """ |
| for k, v in data_dict.items(): |
| self.set(k, v) |
|
|
| def __repr__(self): |
| return f"MaxSizeCache(maxlen={self.maxlen}, data={dict(self)})" |
| cached = MaxSizeCache() |
| _per_key_locks = defaultdict(threading.Lock) |
|
|
| def get_or_compute_masks(key, compute_fn): |
| lock = _per_key_locks[key] |
| with lock: |
| if cached.exists(key): |
| return cached.get(key) |
| |
| masks = compute_fn() |
| cached.set(key, masks) |
| return masks |
| |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT = 5 |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT = 29 |
| INDEX_KEYPOINT_CORNER_TOP_LEFT = 0 |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT = 24 |
|
|
| KEYPOINTS: list[tuple[int, int]] = [ |
| (5, 5), |
| (5, 140), |
| (5, 250), |
| (5, 430), |
| (5, 540), |
| (5, 675), |
| |
| (55, 250), |
| (55, 430), |
| |
| (110, 340), |
| |
| (165, 140), |
| (165, 270), |
| (165, 410), |
| (165, 540), |
| |
| (527, 5), |
| (527, 253), |
| (527, 433), |
| (527, 675), |
| |
| (888, 140), |
| (888, 270), |
| (888, 410), |
| (888, 540), |
| |
| (940, 340), |
| |
| (998, 250), |
| (998, 430), |
| |
| (1045, 5), |
| (1045, 140), |
| (1045, 250), |
| (1045, 430), |
| (1045, 540), |
| (1045, 675), |
| |
| (435, 340), |
| (615, 340), |
| ] |
|
|
| KEYPOINTS_NP = np.asarray(KEYPOINTS, dtype=np.float32) |
|
|
| FOOTBALL_KEYPOINTS: list[tuple[int, int]] = [ |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| |
| (0, 0), |
| (0, 0), |
| (0, 0), |
|
|
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
|
|
| (0, 0), |
| (527, 283), |
| (527, 403), |
| (0, 0), |
|
|
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
|
|
| (0, 0), |
|
|
| (0, 0), |
| (0, 0), |
|
|
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
| (0, 0), |
|
|
| (405, 340), |
| (645, 340), |
| ] |
|
|
| FOOTBALL_KEYPOINTS_NP = np.asarray(FOOTBALL_KEYPOINTS, dtype=np.float32) |
|
|
| groups = { |
| 1: [2, 3, 7, 10], |
| 2: [1, 3, 7, 10], |
| 3: [2, 4, 7, 8], |
| 4: [3, 5, 8, 7], |
| 5: [4, 8, 6, 3], |
| 6: [5, 4, 8, 13], |
| 7: [3, 8, 9, 10], |
| 8: [4, 7, 9, 13], |
| 9: [7, 8, 11, 12], |
| 10: [9, 11, 7, 2], |
| 11: [9, 10, 12, 31], |
| 12: [9, 11, 13, 31], |
| 13: [9, 12, 8, 5], |
| 14: [15, 31, 32, 16], |
| 15: [31, 16, 32, 14], |
| 16: [31, 15, 32, 17], |
| 17: [31, 16, 32, 15], |
| 18: [19, 22, 23, 26], |
| 19: [18, 22, 20, 32], |
| 20: [19, 22, 21, 32], |
| 21: [20, 22, 24, 29], |
| 22: [23, 24, 19, 20], |
| 23: [27, 24, 22, 28], |
| 24: [28, 23, 22, 27], |
| 25: [26, 27, 23, 18], |
| 26: [25, 27, 23, 18], |
| 27: [26, 23, 28, 24], |
| 28: [27, 24, 29, 23], |
| 29: [28, 30, 24, 21], |
| 30: [29, 28, 24, 21], |
| 31: [15, 16, 32, 14], |
| 32: [15, 31, 16, 14] |
| } |
|
|
| base_temps = [(0, 0)] * 32 |
|
|
| _TEMPLATE_MAX_X: int = 1045 |
| _TEMPLATE_MAX_Y: int = 675 |
|
|
| |
| GROUPS_ARRAY = [np.asarray(groups[i], dtype=np.int32) - 1 for i in range(1, 33)] |
|
|
| kernel = getStructuringElement(MORPH_RECT, (31, 31)) |
| dilate_kernel = getStructuringElement( |
| MORPH_RECT, (3, 3) |
| ) |
|
|
| class InvalidMask(Exception): |
| pass |
|
|
| def has_a_wide_line(mask: ndarray, max_aspect_ratio: float = 1.0) -> bool: |
| contours, _ = findContours(mask, RETR_EXTERNAL, CHAIN_APPROX_SIMPLE) |
| for cnt in contours: |
| x, y, w, h = boundingRect(cnt) |
| |
| if w == 0 or h == 0: |
| continue |
| aspect_ratio = min(w, h) / max(w, h) |
| if aspect_ratio >= max_aspect_ratio: |
| return True |
| return False |
|
|
| def is_bowtie(points: ndarray) -> bool: |
| def segments_intersect(p1: int, p2: int, q1: int, q2: int) -> bool: |
| def ccw(a: int, b: int, c: int): |
| return (c[1] - a[1]) * (b[0] - a[0]) > (b[1] - a[1]) * (c[0] - a[0]) |
|
|
| return (ccw(p1, q1, q2) != ccw(p2, q1, q2)) and ( |
| ccw(p1, p2, q1) != ccw(p1, p2, q2) |
| ) |
|
|
| pts = points.reshape(-1, 2) |
| edges = [(pts[0], pts[1]), (pts[1], pts[2]), (pts[2], pts[3]), (pts[3], pts[0])] |
| return segments_intersect(*edges[0], *edges[2]) or segments_intersect( |
| *edges[1], *edges[3] |
| ) |
|
|
| def validate_mask_lines(mask: ndarray) -> None: |
| |
| nonzero_count = countNonZero(mask) |
| if nonzero_count == 0: |
| raise InvalidMask("No projected lines") |
| if nonzero_count == mask.size: |
| raise InvalidMask("Projected lines cover the entire image surface") |
| |
| if has_a_wide_line(mask=mask): |
| raise InvalidMask("A projected line is too wide") |
|
|
| def validate_mask_ground(mask: ndarray) -> None: |
| num_labels, _ = connectedComponents(mask) |
| num_distinct_regions = num_labels - 1 |
| if num_distinct_regions > 1: |
| raise InvalidMask( |
| f"Projected ground should be a single object, detected {num_distinct_regions}" |
| ) |
| area_covered = mask.sum() / mask.size |
| if area_covered >= 0.9: |
| raise InvalidMask( |
| f"Projected ground covers more than {area_covered:.2f}% of the image surface which is unrealistic" |
| ) |
|
|
| def validate_projected_corners( |
| source_keypoints: list[tuple[int, int]], homography_matrix: ndarray |
| ) -> None: |
| |
| corner_indices = np.array([ |
| INDEX_KEYPOINT_CORNER_BOTTOM_LEFT, |
| INDEX_KEYPOINT_CORNER_BOTTOM_RIGHT, |
| INDEX_KEYPOINT_CORNER_TOP_RIGHT, |
| INDEX_KEYPOINT_CORNER_TOP_LEFT |
| ], dtype=np.int32) |
| |
| |
| if isinstance(source_keypoints, np.ndarray): |
| src_corners = source_keypoints[corner_indices] |
| else: |
| src_arr = np.array(source_keypoints, dtype=np.float32) |
| src_corners = src_arr[corner_indices] |
| |
| src_corners = src_corners[None, :, :] |
| warped_corners = perspectiveTransform(src_corners, homography_matrix)[0] |
|
|
| if is_bowtie(warped_corners): |
| raise InvalidMask("Projection twisted!") |
|
|
| def project_image_using_keypoints( |
| image: ndarray, |
| source_keypoints: list[tuple[int, int]], |
| destination_keypoints: list[tuple[int, int]], |
| destination_width: int, |
| destination_height: int, |
| inverse: bool = False, |
| ) -> ndarray: |
| |
| src_arr = np.array(source_keypoints, dtype=np.float32) |
| dst_arr = np.array(destination_keypoints, dtype=np.float32) |
| |
| |
| valid_mask = ~((dst_arr[:, 0] == 0) & (dst_arr[:, 1] == 0)) |
| |
| source_points = src_arr[valid_mask] |
| destination_points = dst_arr[valid_mask] |
|
|
| H, _ = findHomography(source_points, destination_points) |
| if H is None: |
| raise InvalidMask("Homography not found") |
| validate_projected_corners(source_keypoints=source_keypoints, homography_matrix=H) |
| |
| projected_image = warpPerspective(image, H, (destination_width, destination_height)) |
|
|
| return projected_image |
|
|
| def extract_masks_for_ground_and_lines(image: ndarray,) -> tuple[ndarray, ndarray]: |
| """assumes template coloured s.t. ground = gray, lines = white, background = black""" |
| |
| gray = image |
|
|
| _, mask_ground = threshold(gray, 10, 1, THRESH_BINARY) |
|
|
| x, y, w, h = cv2.boundingRect(cv2.findNonZero(mask_ground)) |
| rect_size = w * h |
| area_size = countNonZero(mask_ground) |
| is_rect = area_size == rect_size |
|
|
| if is_rect: |
| raise InvalidMask( |
| f"Projected ground should not be rectangular" |
| ) |
| |
| total_pixels = mask_ground.size |
| ground_nonzero = int(countNonZero(mask_ground)) |
| if ground_nonzero == 0: |
| raise InvalidMask("No projected ground") |
| area_covered = ground_nonzero / float(total_pixels) |
| if area_covered >= 0.9: |
| raise InvalidMask(f"Projected ground covers more than {area_covered:.2f}% of the image surface which is unrealistic") |
|
|
| validate_mask_ground(mask=mask_ground) |
|
|
| _, mask_lines = threshold(gray, 200, 1, THRESH_BINARY) |
| validate_mask_lines(mask=mask_lines) |
| return mask_ground, mask_lines |
|
|
|
|
| def get_edge_mask(x, y, W, H, t): |
| """Uses bitmasking instead of sets for speed.""" |
| mask = 0 |
| if x <= t: mask |= 1 |
| if x >= W - t: mask |= 2 |
| if y <= t: mask |= 4 |
| if y >= H - t: mask |= 8 |
| return mask |
|
|
| def both_points_same_direction_fast(A, B, W, H, t=100): |
| mask_a = get_edge_mask(A[0], A[1], W, H, t) |
| if mask_a == 0: return False |
| |
| mask_b = get_edge_mask(B[0], B[1], W, H, t) |
| if mask_b == 0: return False |
| |
| |
| return (mask_a & mask_b) != 0 |
|
|
| def canonical(obj): |
| |
| if isinstance(obj, np.ndarray): |
| return canonical(obj.tolist()) |
|
|
| |
| if isinstance(obj, (list, tuple)): |
| return tuple(canonical(x) for x in obj) |
|
|
| |
| if isinstance(obj, set): |
| return tuple(sorted(canonical(x) for x in obj)) |
|
|
| |
| if isinstance(obj, dict): |
| return tuple((k, canonical(v)) for k, v in sorted(obj.items())) |
|
|
| return obj |
|
|
| def fast_cache_key(frame_keypoints, w, h): |
| |
| |
| if isinstance(frame_keypoints, np.ndarray): |
| if frame_keypoints.dtype == np.int32: |
| arr = frame_keypoints |
| else: |
| arr = frame_keypoints.astype(np.int32) |
| else: |
| arr = np.asarray(frame_keypoints, dtype=np.int32) |
| return (arr.tobytes(), int(w), int(h)) |
|
|
| blacklists = [ |
| [23, 24, 27, 28], |
| [7, 8, 3, 4], |
| [2, 10, 1, 14], |
| [18, 26, 14, 25], |
| [5, 13, 6, 17], |
| [21, 29, 17, 30], |
| [10, 11, 2, 3], |
| [10, 11, 2, 7], |
| [12, 13, 4, 5], |
| [12, 13, 5, 8], |
| [18, 19, 26, 27], |
| [18, 19, 26, 23], |
| [20, 21, 24, 29], |
| [20, 21, 28, 29], |
| [8, 4, 5, 13], |
| [3, 7, 2, 10], |
| [23, 27, 18, 26], |
| [24, 28, 21, 29] |
| ] |
|
|
| prepared_blacklists = [(set(bl), bl[0]-1, bl[1]-1) for bl in blacklists] |
|
|
| def evaluate_keypoints_for_frame( |
| frame_keypoints: list[tuple[int, int]], |
| frame_index, |
| h, |
| w, |
| precomputed_key=None, |
| ) -> float: |
| global cache |
| |
| key = precomputed_key or canonical(frame_keypoints, w, h) |
| template_keypoints = KEYPOINTS |
| floor_markings_template = template_image_gray |
| |
|
|
| try: |
| |
| def compute_masks_for_key(frame_keypoints, w, h): |
| try: |
| non_idxs_set = {i + 1 for i, kpt in enumerate(frame_keypoints) if kpt[0] != 0 or kpt[1] != 0} |
| for bl_set, idx0, idx1 in prepared_blacklists: |
| if non_idxs_set.issubset(bl_set): |
| if both_points_same_direction_fast(frame_keypoints[idx0], frame_keypoints[idx1], w, h): |
| return None, 0, None |
| |
| warped_template = project_image_using_keypoints( |
| image=floor_markings_template, |
| source_keypoints=template_keypoints, |
| destination_keypoints=frame_keypoints, |
| destination_width=w, |
| destination_height=h, |
| ) |
| mask_ground, mask_lines_expected = extract_masks_for_ground_and_lines( |
| image=warped_template |
| ) |
| mask_expected_on_ground = mask_lines_expected |
| |
| ys, xs = np.where(mask_lines_expected == 1) |
|
|
| if len(xs) == 0: |
| bbox = None |
| else: |
| min_x = xs.min() |
| max_x = xs.max() |
| min_y = ys.min() |
| max_y = ys.max() |
| bbox = (min_x, min_y, max_x, max_y) |
| bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) if bbox is not None else 1 |
| frame_area = h * w |
| |
| if (bbox_area / frame_area) < 0.2: |
| return None, 0, None |
| |
| pixels_on_lines = int(countNonZero(mask_expected_on_ground)) |
| return mask_expected_on_ground, pixels_on_lines, mask_ground |
| except Exception as e: |
| return None, 0, None |
| |
| mask_expected_on_ground, pixels_on_lines, mask_ground = get_or_compute_masks( |
| key, lambda: compute_masks_for_key(frame_keypoints, w, h) |
| ) |
| if mask_expected_on_ground is None or pixels_on_lines == 0 or mask_ground is None: |
| return 0.0 |
|
|
| image_edges = check_frame[frame_index] |
|
|
| h, w = mask_expected_on_ground.shape[:2] |
| work_buffer = np.zeros((h, w), dtype=np.uint8) |
| bitwise_and( |
| image_edges, |
| image_edges, |
| dst=work_buffer, |
| mask=mask_ground |
| ) |
| dilate(work_buffer, dilate_kernel, dst=work_buffer, iterations=3) |
| threshold(work_buffer, 0, 255, cv2.THRESH_BINARY, dst=work_buffer) |
| pixels_predicted_count = countNonZero(work_buffer) |
| bitwise_and(work_buffer, mask_expected_on_ground, dst=work_buffer) |
| pixels_overlapping = countNonZero(work_buffer) |
| pixels_rest = pixels_predicted_count - pixels_overlapping |
| total_pixels = pixels_predicted_count + pixels_on_lines - pixels_overlapping |
| if total_pixels > 0 and (pixels_rest / total_pixels) > 0.9: |
| return 0.0 |
| score = pixels_overlapping / (pixels_on_lines + 1e-8) |
| return score |
| except Exception as e: |
| pass |
| return 0.0 |
|
|
| def _generate_sparse_template_keypoints(frame_width: int, frame_height: int) -> list[tuple[int, int]]: |
| key = (int(frame_width), int(frame_height)) |
| if key in _sparse_template_cache: |
| return _sparse_template_cache[key] |
| template_max_x, template_max_y = (1045, 675) |
| sx = float(frame_width) / float(template_max_x if template_max_x != 0 else 1) |
| sy = float(frame_height) / float(template_max_y if template_max_y != 0 else 1) |
| |
| scale_factors = np.array([sx, sy], dtype=np.float32) |
| scaled_np = np.round(FOOTBALL_KEYPOINTS_NP * scale_factors).astype(np.int32) |
| scaled = [(int(x), int(y)) for x, y in scaled_np] |
| _sparse_template_cache[key] = scaled |
| return scaled |
|
|
| def convert_keypoints_to_val_format(keypoints): |
| |
| if not keypoints: |
| return [] |
| arr = np.asarray(keypoints, dtype=np.int32) |
| return [(int(x), int(y)) for x, y in arr] |
|
|
|
|
| def are_collinear(pts, eps=1e-9): |
| pts = np.asarray(pts) |
| if len(pts) < 3: |
| return True |
| a, b, c = pts[:3] |
| area = np.abs(np.cross(b - a, c - a)) |
| return area < eps |
|
|
| def line_to_line_transform(P1, P2, Q1, Q2): |
| """ |
| Compute 2D affine transformation mapping line segment P1P2 -> Q1Q2 |
| Optimized version reducing allocations. |
| |
| Parameters: |
| P1, P2: source points (x, y) |
| Q1, Q2: target points (x, y) |
| |
| Returns: |
| M: 3x3 homogeneous transformation matrix |
| """ |
| P1 = np.asarray(P1, dtype=np.float64) |
| P2 = np.asarray(P2, dtype=np.float64) |
| Q1 = np.asarray(Q1, dtype=np.float64) |
| Q2 = np.asarray(Q2, dtype=np.float64) |
| |
| |
| v_s = P2 - P1 |
| v_t = Q2 - Q1 |
| |
| |
| norm_s = np.hypot(v_s[0], v_s[1]) |
| norm_t = np.hypot(v_t[0], v_t[1]) |
| s = norm_t / norm_s |
| |
| |
| theta = np.arctan2(v_t[1], v_t[0]) - np.arctan2(v_s[1], v_s[0]) |
| |
| |
| cos_theta = np.cos(theta) |
| sin_theta = np.sin(theta) |
| |
| |
| sr00 = s * cos_theta |
| sr01 = -s * sin_theta |
| sr10 = s * sin_theta |
| sr11 = s * cos_theta |
| |
| |
| t0 = Q1[0] - (sr00 * P1[0] + sr01 * P1[1]) |
| t1 = Q1[1] - (sr10 * P1[0] + sr11 * P1[1]) |
| |
| |
| M = np.array([ |
| [sr00, sr01, t0], |
| [sr10, sr11, t1], |
| [0.0, 0.0, 1.0] |
| ], dtype=np.float64) |
| |
| return M |
|
|
| def three_point_affine(P, Q): |
| P = np.array(P, dtype=np.float64) |
| Q = np.array(Q, dtype=np.float64) |
| n = P.shape[0] |
| |
| |
| x, y = P[:, 0], P[:, 1] |
| u, v = Q[:, 0], Q[:, 1] |
| |
| |
| A = np.zeros((2*n, 6), dtype=np.float64) |
| A[0::2, 0] = x |
| A[0::2, 1] = y |
| A[0::2, 2] = 1 |
| A[1::2, 3] = x |
| A[1::2, 4] = y |
| A[1::2, 5] = 1 |
| |
| |
| b = np.empty(2*n, dtype=np.float64) |
| b[0::2] = u |
| b[1::2] = v |
| |
| |
| params, _, _, _ = np.linalg.lstsq(A, b, rcond=None) |
| a, b_, e, c, d, f = params |
| |
| |
| M = np.array([ |
| [a, b_, e], |
| [c, d, f], |
| [0, 0, 1] |
| ], dtype=np.float64) |
| |
| return M |
|
|
| def affine_from_4_points(src_pts, dst_pts): |
| """ |
| Compute a 2D affine transformation from 4 source points to 4 target points using least-squares. |
| Vectorized version for better performance. |
| |
| Parameters: |
| src_pts: list of 4 source points [(x1,y1),..., (x4,y4)] |
| dst_pts: list of 4 target points [(u1,v1),..., (u4,v4)] |
| |
| Returns: |
| 3x3 homogeneous affine transformation matrix |
| """ |
| P = np.array(src_pts, dtype=np.float64) |
| Q = np.array(dst_pts, dtype=np.float64) |
| |
| |
| x, y = P[:, 0], P[:, 1] |
| u, v = Q[:, 0], Q[:, 1] |
| |
| A = np.zeros((8, 6), dtype=np.float64) |
| A[0::2, 0] = x |
| A[0::2, 1] = y |
| A[0::2, 2] = 1 |
| A[1::2, 3] = x |
| A[1::2, 4] = y |
| A[1::2, 5] = 1 |
| |
| b = np.empty(8, dtype=np.float64) |
| b[0::2] = u |
| b[1::2] = v |
|
|
| |
| params, _, _, _ = np.linalg.lstsq(A, b, rcond=None) |
| a, b_, e, c, d, f = params |
|
|
| |
| M = np.array([ |
| [a, b_, e], |
| [c, d, f], |
| [0, 0, 1] |
| ], dtype=np.float64) |
| return M |
|
|
| def four_point_homography(src_pts, dst_pts): |
| """ |
| Compute 2D homography mapping 4 source points to 4 target points. |
| Vectorized version for better performance. |
| |
| src_pts: list of 4 source points [(x1,y1),..., (x4,y4)] |
| dst_pts: list of 4 target points [(u1,v1),..., (u4,v4)] |
| |
| Returns: |
| 3x3 homography matrix |
| """ |
| |
| src = np.array(src_pts, dtype=np.float64) |
| dst = np.array(dst_pts, dtype=np.float64) |
| |
| x, y = src[:, 0], src[:, 1] |
| u, v = dst[:, 0], dst[:, 1] |
| |
| |
| A = np.zeros((8, 9), dtype=np.float64) |
| A[0::2, 0] = -x |
| A[0::2, 1] = -y |
| A[0::2, 2] = -1 |
| A[0::2, 6] = x * u |
| A[0::2, 7] = y * u |
| A[0::2, 8] = u |
| |
| A[1::2, 3] = -x |
| A[1::2, 4] = -y |
| A[1::2, 5] = -1 |
| A[1::2, 6] = x * v |
| A[1::2, 7] = y * v |
| A[1::2, 8] = v |
| |
| |
| _, _, Vt = np.linalg.svd(A) |
| h = Vt[-1, :] |
| H = h.reshape(3, 3) |
| |
| |
| H /= H[2, 2] |
| return H |
|
|
| def unique_points(src, dst): |
| src, dst = np.asarray(src, float), np.asarray(dst, float) |
| |
| src_nonzero = ~np.all(np.abs(src) < 1e-9, axis=1) |
| dst_nonzero = ~np.all(np.abs(dst) < 1e-9, axis=1) |
| valid_mask = src_nonzero & dst_nonzero |
| |
| if not valid_mask.any(): |
| return np.array([]), np.array([]) |
| |
| src_valid = src[valid_mask] |
| dst_valid = dst[valid_mask] |
| |
| |
| _, unique_idx = np.unique(src_valid, axis=0, return_index=True) |
| unique_idx.sort() |
| |
| return src_valid[unique_idx], dst_valid[unique_idx] |
|
|
| def robust_transform(src_pts, dst_pts): |
| src, dst = unique_points(src_pts, dst_pts) |
| n = len(src) |
| if n >= 4: |
| if are_collinear(src) or are_collinear(dst): |
| H = affine_from_4_points(src, dst) |
| return lambda pt: apply_transform(H, pt) |
| else: |
| H = four_point_homography(src, dst) |
| return lambda pt: apply_homo_transform(H, pt) |
| elif n==3: |
| H = three_point_affine(src,dst) |
| elif n==2: |
| H = line_to_line_transform(src[0],src[1],dst[0],dst[1]) |
| elif n==1: |
| t = dst[0]-src[0] |
| H = np.eye(3) |
| H[:2,2] = t |
| else: |
| H = np.eye(3) |
| return lambda pt: apply_transform(H, pt) |
|
|
| def apply_homo_transform(M, P): |
| |
| x, y = P[0], P[1] |
| |
| |
| w = M[2, 0] * x + M[2, 1] * y + M[2, 2] |
| x_new = (M[0, 0] * x + M[0, 1] * y + M[0, 2]) / w |
| y_new = (M[1, 0] * x + M[1, 1] * y + M[1, 2]) / w |
| |
| |
| return (int(x_new - x), int(y_new - y)) |
|
|
| def apply_transform(M, P): |
| """ |
| Transform a single 2D point using a 3x3 transformation matrix H. |
| Optimized version avoiding array creation. |
| |
| Args: |
| H : 3x3 numpy array |
| Transformation matrix (homography, affine, similarity, etc.) |
| point : (x, y) array-like |
| Single point coordinates to transform. |
| |
| Returns: |
| (x', y') : Transformed point coordinates |
| """ |
| |
| x, y = P[0], P[1] |
| x_new = M[0, 0] * x + M[0, 1] * y + M[0, 2] |
| y_new = M[1, 0] * x + M[1, 1] * y + M[1, 2] |
| return (int(x_new), int(y_new)) |
| |
| def pick_pt(points): |
| |
| if not points: |
| return [] |
| pts_arr = np.asarray(points, dtype=np.int32) |
| seen = np.zeros(32, dtype=bool) |
| valid_mask = (pts_arr >= 0) & (pts_arr < 32) |
| seen[pts_arr[valid_mask]] = True |
| |
| out_seen = np.zeros(32, dtype=bool) |
| out = [] |
| for p in pts_arr[valid_mask]: |
| neigh = GROUPS_ARRAY[p] |
| candidates = neigh[~seen[neigh] & ~out_seen[neigh]] |
| out_seen[candidates] = True |
| out.extend(candidates.tolist()) |
| return out |
|
|
| def make_possible_keypoints(all_keypoints, frame_width, frame_height, limit=2): |
| |
| if not all_keypoints: |
| return [] |
|
|
| results = [] |
|
|
| for keypoints in all_keypoints: |
| |
| |
| |
| arr = np.asarray(keypoints, dtype=np.int32) |
|
|
| |
| if arr.ndim != 2 or arr.shape[1] != 2: |
| continue |
|
|
| |
| mask = (arr[:, 0] != 0) & (arr[:, 1] != 0) |
| non_zero_count = mask.sum() |
| |
| |
| if non_zero_count > 4: |
| results.append(keypoints) |
| continue |
| |
| if non_zero_count < 2: |
| continue |
|
|
| |
| if non_zero_count == 4: |
| results.append(keypoints) |
|
|
| |
| non_zero_idxs = np.flatnonzero(mask) |
| |
| |
| src = KEYPOINTS_NP[non_zero_idxs] |
| dest = arr[non_zero_idxs].astype(np.float32) |
|
|
| try: |
| |
| transform_func = robust_transform(src, dest) |
| except Exception: |
| continue |
| |
| |
| candidate_idxs = pick_pt(non_zero_idxs.tolist()) |
| if not candidate_idxs: |
| continue |
|
|
| |
| valid_cache = {} |
| valid_real_idxs = [] |
|
|
| for idx in candidate_idxs: |
| |
| t_pt = transform_func(KEYPOINTS_NP[idx]) |
| |
| |
| tx, ty = t_pt[0], t_pt[1] |
| |
| |
| if 0 <= tx < frame_width and 0 <= ty < frame_height: |
| valid_cache[idx] = (int(tx), int(ty)) |
| valid_real_idxs.append(idx) |
|
|
| |
| n_missing = 5 - non_zero_count |
| if len(valid_real_idxs) < n_missing: |
| continue |
|
|
| |
| cnt = 0 |
| for group in combinations(valid_real_idxs, n_missing): |
| if cnt >= limit: |
| break |
| cnt += 1 |
| |
| |
| |
| new_result = list(keypoints) |
| |
| |
| for idx in group: |
| new_result[idx] = valid_cache[idx] |
| |
| results.append(new_result) |
|
|
| return results |
| |
| def _get_shared_eval_executor(max_workers: int) -> ThreadPoolExecutor: |
| global _shared_eval_executor |
| if _shared_eval_executor is None: |
| _shared_eval_executor = ThreadPoolExecutor(max_workers=max_workers) |
| return _shared_eval_executor |
|
|
| def evaluates(jobs, h, w, total_frames: int): |
| |
| if len(jobs) == 0: |
| return [] |
| |
| unique_jobs = [] |
| seen = set() |
| |
| for (job, frame_index) in jobs: |
| try: |
| |
| if isinstance(job, np.ndarray): |
| key_bytes = job.astype(np.int32).tobytes() if job.dtype != np.int32 else job.tobytes() |
| else: |
| key_bytes = np.asarray(job, dtype=np.int32).tobytes() |
| |
| sig = (frame_index, key_bytes) |
| if sig in seen: |
| continue |
| seen.add(sig) |
| unique_jobs.append((job, frame_index, key_bytes)) |
| except Exception as e: |
| continue |
|
|
| if len(unique_jobs) <= 10: |
| scores_unique = [ |
| evaluate_keypoints_for_frame(job, frame_index, h, w, precomputed_key=(key_bytes, w, h)) |
| for (job, frame_index, key_bytes) in unique_jobs |
| ] |
| else: |
| cpu_count = max(1, (os.cpu_count() or 1)) |
| max_workers = min(max(2, cpu_count), 8) |
|
|
| chunk_size = 500 |
| scores_unique = [] |
| ex = _get_shared_eval_executor(max_workers) |
| |
| for i in range(0, len(unique_jobs), chunk_size): |
| chunk = unique_jobs[i:i + chunk_size] |
| scores_unique.extend( |
| ex.map( |
| lambda pair: evaluate_keypoints_for_frame(pair[0], pair[1], h, w, precomputed_key=(pair[2], w, h)), |
| chunk, |
| ) |
| ) |
| scores = np.full(total_frames, -1.0, dtype=np.float32) |
| results = [[(0, 0)] * 32 for _ in range(total_frames)] |
|
|
| for score, (k, frame_index, _) in zip(scores_unique, unique_jobs): |
| if score > scores[frame_index]: |
| scores[frame_index] = score |
| results[frame_index] = k |
| |
| return results |
|
|
| def fix_keypoints_pri( |
| results_frames, |
| frame_width: int, |
| frame_height: int |
| ) -> list[Any]: |
| sparse_template = convert_keypoints_to_val_format(_generate_sparse_template_keypoints(frame_width, frame_height)) |
| max_frames = len(results_frames) |
| limit = 30 |
| before = deque(maxlen=limit) |
| after = deque(maxlen=limit) |
| |
| all_possible = [None] * max_frames |
| for i in range(max_frames): |
| all_possible[i] = make_possible_keypoints([results_frames[i]], frame_width, frame_height) |
| for i in range(1, min(limit, max_frames)): |
| after.append(all_possible[i]) |
| |
| current = all_possible[0] if max_frames > 0 else [] |
| total_jobs = [] |
|
|
| for frame_index in range(max_frames): |
| if frame_index < max_frames - limit: |
| future_idx = frame_index + limit |
| if all_possible[future_idx] is None: |
| all_possible[future_idx] = make_possible_keypoints([results_frames[future_idx]], frame_width, frame_height) |
| after.append(all_possible[future_idx]) |
| |
| frame_jobs = [(kpts, frame_index) for kpts in current] |
| for t in after: |
| frame_jobs.extend([(kpts, frame_index) for kpts in t]) |
| for t in before: |
| frame_jobs.extend([(kpts, frame_index) for kpts in t]) |
| frame_jobs.append((sparse_template, frame_index)) |
|
|
| total_jobs.extend(frame_jobs) |
| |
| before.append(current) |
| |
| if len(after) != 0: |
| current = after.popleft() |
| |
| start_time = time.time() |
| results = evaluates(total_jobs, frame_height, frame_width, max_frames) |
| print(f"Evaluation time: {time.time() - start_time}") |
| return results |
|
|
| |
| def normalize_results(frame_results, threshold): |
| if not frame_results: |
| return [] |
| |
| results_array = [] |
| for result in frame_results: |
| arr = np.array(result, dtype=np.float32) |
| if arr.size == 0: |
| results_array.append([]) |
| continue |
| |
| mask = arr[:, 2] > threshold |
| scaled = arr[:, :2] |
| scaled = np.where(mask[:, None], scaled, 0) |
| results_array.append([(int(x), int(y)) for x, y in scaled]) |
| |
| return results_array |
|
|
| def convert_to_gray(image): |
| gray = cvtColor(image, COLOR_BGR2GRAY) |
| gray = morphologyEx(gray, MORPH_TOPHAT, kernel, dst=gray) |
| GaussianBlur(gray, (5, 5), 0, dst=gray) |
| image_edges = Canny(gray, 30, 100) |
| return image_edges |
|
|
| class Miner: |
| def __init__(self, path_hf_repo: Path) -> None: |
|
|
| global _OSNET_MODEL, team_classifier_path |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| self.device = device |
| self.path_hf_repo = path_hf_repo |
|
|
| print("✅ Loading YOLO models...") |
|
|
| self.bbox_model = YOLO(path_hf_repo / "player_detect.pt") |
|
|
| print("✅ Loading Team Classifier...") |
|
|
|
|
| self.keypoints_model = load_kp_model(path_hf_repo, device) |
| self.pitch_batch_size = 4 |
| self.osnet_batch_size = 8 |
| self.kp_threshold = 0.3 |
|
|
| team_classifier_path = path_hf_repo / "osnet_model.pth.tar-100" |
|
|
| _OSNET_MODEL = load_osnet(device, team_classifier_path) |
|
|
| print("✅ All models loaded") |
|
|
| def predict_batch(self, batch_images: list[ndarray], offset: int, n_keypoints: int): |
| start = time.time() |
| |
| bboxes = {} |
| bbox_model_results = self.bbox_model.predict(batch_images, verbose=False) |
| print(f"Detect objects: {time.time() - start}") |
|
|
| start = time.time() |
| track_id = 0 |
| track_number = 1 |
| for frame_number_in_batch, detection in enumerate(bbox_model_results): |
| boxes: list[BoundingBox] = [] |
| for box in detection.boxes.data: |
| x1, y1, x2, y2, conf, cls_id = box.tolist() |
| temp_track_id = None |
| if cls_id == PLAYER_ID : |
| track_id += 1 |
| temp_track_id = track_id |
|
|
| boxes.append( |
| BoundingBox( |
| x1=int(x1), y1=int(y1), |
| x2=int(x2), y2=int(y2), |
| cls_id=int(cls_id), |
| conf=float(conf), |
| track_id = temp_track_id, |
| ) |
| ) |
|
|
| ball_idxs = [i for i, b in enumerate(boxes) if b.cls_id == BALL_ID] |
| if len(ball_idxs) > 1: |
| best_i = max(ball_idxs, key=lambda i: boxes[i].conf) |
| boxes = [ |
| b for i, b in enumerate(boxes) |
| if not (b.cls_id == BALL_ID and i != best_i) |
| ] |
|
|
| gk_idxs = [i for i, b in enumerate(boxes) if b.cls_id == GK_ID] |
| if len(gk_idxs) > 1: |
| best_gk_i = max(gk_idxs, key=lambda i: boxes[i].conf) |
| for i in gk_idxs: |
| if i != best_gk_i: |
| boxes[i].cls_id = PLAYER_ID |
| track_id += 1 |
| boxes[i].track_id = track_id |
|
|
| ref_idxs = [i for i, b in enumerate(boxes) if b.cls_id == REF_ID] |
| if len(ref_idxs) > 3: |
| |
| ref_idxs_sorted = sorted(ref_idxs, key=lambda i: boxes[i].conf, reverse=True) |
| keep = set(ref_idxs_sorted[:3]) |
| for i in ref_idxs: |
| if i not in keep: |
| boxes[i].cls_id = PLAYER_ID |
| track_id += 1 |
| boxes[i].track_id = track_id |
|
|
| bboxes[offset + frame_number_in_batch] = boxes |
|
|
| t_redi = team_classifier_path |
| classify_teams_batch( |
| frames=batch_images, |
| batch_boxes=bboxes, |
| batch_size=self.osnet_batch_size, |
| device=self.device |
| ) |
| print(f"finish team classify") |
| print(f"Object Tracking: {time.time() - start}") |
|
|
| start = time.time() |
| batch_size = len(batch_images) |
|
|
| processed_tensors = [] |
| original_sizes = [] |
|
|
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| torch.cuda.synchronize() |
|
|
| pitch_size = min(self.pitch_batch_size, len(batch_images)) |
| device_str = "cuda" if torch.cuda.is_available() else "cpu" |
| keypoints = [] |
| keypoints_result = process_batch_input( |
| batch_images, |
| self.keypoints_model, |
| self.kp_threshold, |
| device_str, |
| batch_size=pitch_size, |
| ) |
| print(f"Kps detection: {time.time() - start}") |
| start = time.time() |
| keypoints = normalize_keypoints(keypoints_result, batch_images, n_keypoints) |
| for idx, kpts in enumerate(keypoints): |
| keypoints[idx] = fix_keypoints(kpts, n_keypoints) |
|
|
| h, w = batch_images[0].shape[:2] |
| keypoints_by_frame = fix_keypoints_pri(keypoints, w, h) |
| print(f"Fix kps: {time.time() - start}") |
|
|
| results = [] |
| for i in range(len(batch_images)): |
| frame_number = offset + i |
| results.append( |
| TVFrameResult( |
| frame_id=frame_number, |
| boxes=bboxes.get(frame_number, []), |
| keypoints=convert_keypoints_to_val_format(keypoints_by_frame[frame_number - offset]) |
| ) |
| ) |
|
|
| return results |
|
|
|
|