| import cv2
|
| import numpy as np
|
| from sklearn.cluster import KMeans
|
| import warnings
|
| import time
|
|
|
| import torch
|
| from torchvision.ops import batched_nms
|
| from numpy import ndarray
|
|
|
| warnings.filterwarnings('ignore', category=RuntimeWarning)
|
| warnings.filterwarnings('ignore', category=FutureWarning)
|
| warnings.filterwarnings('ignore', category=UserWarning)
|
|
|
|
|
| import logging
|
| logging.getLogger('sklearn').setLevel(logging.ERROR)
|
|
|
| def get_grass_color(img):
|
|
|
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
|
|
|
|
|
| lower_green = np.array([30, 40, 40])
|
| upper_green = np.array([80, 255, 255])
|
|
|
|
|
| mask = cv2.inRange(hsv, lower_green, upper_green)
|
|
|
|
|
| masked_img = cv2.bitwise_and(img, img, mask=mask)
|
| grass_color = cv2.mean(img, mask=mask)
|
| return grass_color[:3]
|
|
|
| def get_players_boxes(frame, result):
|
| players_imgs = []
|
| players_boxes = []
|
| for (box, score, cls) in result:
|
| label = int(cls)
|
| if label == 0:
|
| x1, y1, x2, y2 = box.astype(int)
|
| player_img = frame[y1: y2, x1: x2]
|
| players_imgs.append(player_img)
|
| players_boxes.append([box, score, cls])
|
| return players_imgs, players_boxes
|
|
|
| def get_kits_colors(players, grass_hsv=None, frame=None):
|
| kits_colors = []
|
| if grass_hsv is None:
|
| grass_color = get_grass_color(frame)
|
| grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV)
|
|
|
| for player_img in players:
|
|
|
| if player_img is None or player_img.size == 0 or len(player_img.shape) != 3:
|
| continue
|
|
|
|
|
| hsv = cv2.cvtColor(player_img, cv2.COLOR_BGR2HSV)
|
|
|
|
|
| lower_green = np.array([grass_hsv[0, 0, 0] - 10, 40, 40])
|
| upper_green = np.array([grass_hsv[0, 0, 0] + 10, 255, 255])
|
|
|
|
|
| mask = cv2.inRange(hsv, lower_green, upper_green)
|
|
|
|
|
| mask = cv2.bitwise_not(mask)
|
| upper_mask = np.zeros(player_img.shape[:2], np.uint8)
|
| upper_mask[0:player_img.shape[0]//2, 0:player_img.shape[1]] = 255
|
| mask = cv2.bitwise_and(mask, upper_mask)
|
|
|
| kit_color = np.array(cv2.mean(player_img, mask=mask)[:3])
|
|
|
| kits_colors.append(kit_color)
|
| return kits_colors
|
|
|
| def get_kits_classifier(kits_colors):
|
| if len(kits_colors) == 0:
|
| return None
|
| if len(kits_colors) == 1:
|
|
|
| return None
|
| kits_kmeans = KMeans(n_clusters=2)
|
| kits_kmeans.fit(kits_colors)
|
| return kits_kmeans
|
|
|
| def classify_kits(kits_classifer, kits_colors):
|
| if kits_classifer is None or len(kits_colors) == 0:
|
| return np.array([0])
|
| team = kits_classifer.predict(kits_colors)
|
| return team
|
|
|
| def get_left_team_label(players_boxes, kits_colors, kits_clf):
|
| left_team_label = 0
|
| team_0 = []
|
| team_1 = []
|
|
|
| for i in range(len(players_boxes)):
|
| x1, y1, x2, y2 = players_boxes[i][0].astype(int)
|
| team = classify_kits(kits_clf, [kits_colors[i]]).item()
|
| if team == 0:
|
| team_0.append(np.array([x1]))
|
| else:
|
| team_1.append(np.array([x1]))
|
|
|
| team_0 = np.array(team_0)
|
| team_1 = np.array(team_1)
|
|
|
|
|
| avg_team_0 = np.average(team_0) if len(team_0) > 0 else 0
|
| avg_team_1 = np.average(team_1) if len(team_1) > 0 else 0
|
|
|
| if avg_team_0 - avg_team_1 > 0:
|
| left_team_label = 1
|
|
|
| return left_team_label
|
|
|
| def check_box_boundaries(boxes, img_height, img_width):
|
| """
|
| Check if bounding boxes are within image boundaries and clip them if necessary.
|
|
|
| Args:
|
| boxes: numpy array of shape (N, 4) with [x1, y1, x2, y2] format
|
| img_height: height of the image
|
| img_width: width of the image
|
|
|
| Returns:
|
| valid_boxes: numpy array of valid boxes within boundaries
|
| valid_indices: indices of valid boxes
|
| """
|
| x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3]
|
|
|
|
|
| valid_mask = (x1 >= 0) & (y1 >= 0) & (x2 < img_width) & (y2 < img_height) & (x1 < x2) & (y1 < y2)
|
|
|
| if not np.any(valid_mask):
|
| return np.array([]), np.array([])
|
|
|
| valid_boxes = boxes[valid_mask]
|
| valid_indices = np.where(valid_mask)[0]
|
|
|
|
|
| valid_boxes[:, 0] = np.clip(valid_boxes[:, 0], 0, img_width - 1)
|
| valid_boxes[:, 1] = np.clip(valid_boxes[:, 1], 0, img_height - 1)
|
| valid_boxes[:, 2] = np.clip(valid_boxes[:, 2], 0, img_width - 1)
|
| valid_boxes[:, 3] = np.clip(valid_boxes[:, 3], 0, img_height - 1)
|
|
|
| return valid_boxes, valid_indices
|
|
|
| def process_team_identification_batch(frames, results, kits_clf, left_team_label, grass_hsv):
|
| """
|
| Process team identification and label formatting for batch results.
|
|
|
| Args:
|
| frames: list of frames
|
| results: list of detection results for each frame
|
| kits_clf: trained kit classifier
|
| left_team_label: label for left team
|
| grass_hsv: grass color in HSV format
|
|
|
| Returns:
|
| processed_results: list of processed results with team identification
|
| """
|
| processed_results = []
|
|
|
| for frame_idx, frame in enumerate(frames):
|
| frame_results = []
|
| frame_detections = results[frame_idx]
|
|
|
| if not frame_detections:
|
| processed_results.append([])
|
| continue
|
|
|
|
|
| players_imgs = []
|
| players_boxes = []
|
| player_indices = []
|
|
|
| for idx, (box, score, cls) in enumerate(frame_detections):
|
| label = int(cls)
|
| if label == 0:
|
| x1, y1, x2, y2 = box.astype(int)
|
|
|
|
|
| if (x1 >= 0 and y1 >= 0 and x2 < frame.shape[1] and y2 < frame.shape[0] and x1 < x2 and y1 < y2):
|
| player_img = frame[y1:y2, x1:x2]
|
| if player_img.size > 0:
|
| players_imgs.append(player_img)
|
| players_boxes.append([box, score, cls])
|
| player_indices.append(idx)
|
|
|
|
|
| player_team_map = {}
|
|
|
|
|
| if players_imgs and kits_clf is not None:
|
| kits_colors = get_kits_colors(players_imgs, grass_hsv)
|
| teams = classify_kits(kits_clf, kits_colors)
|
|
|
|
|
| for i, team in enumerate(teams):
|
| player_team_map[player_indices[i]] = team.item()
|
|
|
| id = 0
|
|
|
| for idx, (box, score, cls) in enumerate(frame_detections):
|
| label = int(cls)
|
| x1, y1, x2, y2 = box.astype(int)
|
|
|
|
|
| valid_boxes, valid_indices = check_box_boundaries(
|
| np.array([[x1, y1, x2, y2]]), frame.shape[0], frame.shape[1]
|
| )
|
|
|
| if len(valid_boxes) == 0:
|
| continue
|
|
|
| x1, y1, x2, y2 = valid_boxes[0].astype(int)
|
|
|
|
|
| if label == 0:
|
| if players_imgs and kits_clf is not None and idx in player_team_map:
|
| team = player_team_map[idx]
|
| if team == left_team_label:
|
| final_label = 6
|
| else:
|
| final_label = 7
|
| else:
|
| final_label = 6
|
|
|
| elif label == 1:
|
| final_label = 1
|
|
|
| elif label == 2:
|
| final_label = 0
|
|
|
| elif label == 3 or label == 4:
|
| final_label = 3
|
|
|
| else:
|
| continue
|
|
|
|
|
| frame_results.append({
|
| "id": int(id),
|
| "bbox": [int(x1), int(y1), int(x2), int(y2)],
|
| "class_id": int(final_label),
|
| "conf": float(score)
|
| })
|
| id = id + 1
|
|
|
| processed_results.append(frame_results)
|
|
|
| return processed_results
|
|
|
| def convert_numpy_types(obj):
|
| """Convert numpy types to native Python types for JSON serialization."""
|
| if isinstance(obj, np.integer):
|
| return int(obj)
|
| elif isinstance(obj, np.floating):
|
| return float(obj)
|
| elif isinstance(obj, np.ndarray):
|
| return obj.tolist()
|
| elif isinstance(obj, dict):
|
| return {key: convert_numpy_types(value) for key, value in obj.items()}
|
| elif isinstance(obj, list):
|
| return [convert_numpy_types(item) for item in obj]
|
| else:
|
| return obj
|
|
|
| def pre_process_img(frames, scale):
|
| imgs = np.stack([cv2.resize(frame, (int(scale), int(scale))) for frame in frames])
|
| imgs = imgs.transpose(0, 3, 1, 2)
|
| imgs = imgs.astype(np.float32) / 255.0
|
| return imgs
|
|
|
| def post_process_output(outputs, x_scale, y_scale, conf_thresh=0.6, nms_thresh=0.75):
|
| B, C, N = outputs.shape
|
| outputs = torch.from_numpy(outputs)
|
| outputs = outputs.permute(0, 2, 1)
|
| boxes = outputs[..., :4]
|
| class_scores = 1 / (1 + torch.exp(-outputs[..., 4:]))
|
| conf, class_id = class_scores.max(dim=2)
|
|
|
| mask = conf > conf_thresh
|
|
|
| for i in range(class_id.shape[0]):
|
|
|
| ball_idx = np.where(class_id[i] == 2)[0]
|
| if ball_idx.size > 0:
|
|
|
| top = ball_idx[np.argmax(conf[i, ball_idx])]
|
| if conf[i, top] > 0.55:
|
| mask[i, top] = True
|
|
|
|
|
|
|
|
|
| batch_idx, pred_idx = mask.nonzero(as_tuple=True)
|
|
|
| if len(batch_idx) == 0:
|
| return [[] for _ in range(B)]
|
|
|
| boxes = boxes[batch_idx, pred_idx]
|
| conf = conf[batch_idx, pred_idx]
|
| class_id = class_id[batch_idx, pred_idx]
|
|
|
| x, y, w, h = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3]
|
| x1 = (x - w / 2) * x_scale
|
| y1 = (y - h / 2) * y_scale
|
| x2 = (x + w / 2) * x_scale
|
| y2 = (y + h / 2) * y_scale
|
| boxes_xyxy = torch.stack([x1, y1, x2, y2], dim=1)
|
|
|
| max_coord = 1e4
|
| offset = batch_idx.to(boxes_xyxy) * max_coord
|
| boxes_for_nms = boxes_xyxy + offset[:, None]
|
|
|
| keep = batched_nms(boxes_for_nms, conf, batch_idx, nms_thresh)
|
|
|
| boxes_final = boxes_xyxy[keep]
|
| conf_final = conf[keep]
|
| class_final = class_id[keep]
|
| batch_final = batch_idx[keep]
|
|
|
| results = [[] for _ in range(B)]
|
| for b in range(B):
|
| mask_b = batch_final == b
|
| if mask_b.sum() == 0:
|
| continue
|
| results[b] = list(zip(boxes_final[mask_b].numpy(),
|
| conf_final[mask_b].numpy(),
|
| class_final[mask_b].numpy()))
|
| return results
|
|
|
| def player_detection_result(frames: list[ndarray], batch_size, model, kits_clf=None, left_team_label=None, grass_hsv=None):
|
| start_time = time.time()
|
|
|
|
|
| height, width = frames[0].shape[:2]
|
| scale = 640.0
|
| x_scale = width / scale
|
| y_scale = height / scale
|
|
|
|
|
|
|
| infer_time = time.time()
|
| kits_clf = kits_clf
|
| left_team_label = left_team_label
|
| grass_hsv = grass_hsv
|
| results = []
|
| for i in range(0, len(frames), batch_size):
|
| if i + batch_size > len(frames):
|
| batch_size = len(frames) - i
|
| batch_frames = frames[i:i + batch_size]
|
| imgs = pre_process_img(batch_frames, scale)
|
|
|
| input_name = model.get_inputs()[0].name
|
| outputs = model.run(None, {input_name: imgs})[0]
|
| raw_results = post_process_output(np.array(outputs), x_scale, y_scale)
|
|
|
| if kits_clf is None or left_team_label is None or grass_hsv is None:
|
|
|
| first_frame = batch_frames[0]
|
| first_frame_results = raw_results[0] if raw_results else []
|
|
|
| if first_frame_results:
|
| players_imgs, players_boxes = get_players_boxes(first_frame, first_frame_results)
|
| if players_imgs:
|
| grass_color = get_grass_color(first_frame)
|
| grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV)
|
| kits_colors = get_kits_colors(players_imgs, grass_hsv)
|
| if kits_colors:
|
| kits_clf = get_kits_classifier(kits_colors)
|
| if kits_clf is not None:
|
| left_team_label = int(get_left_team_label(players_boxes, kits_colors, kits_clf))
|
|
|
|
|
| processed_results = process_team_identification_batch(
|
| batch_frames, raw_results, kits_clf, left_team_label, grass_hsv
|
| )
|
|
|
| processed_results = convert_numpy_types(processed_results)
|
| results.extend(processed_results)
|
|
|
|
|
| return results, kits_clf, left_team_label, grass_hsv |