| import cv2 |
| import numpy as np |
| from sklearn.cluster import KMeans |
| import warnings |
| import time |
|
|
| import torch |
| from torchvision.ops import batched_nms |
| from numpy import ndarray |
| |
| warnings.filterwarnings('ignore', category=RuntimeWarning) |
| warnings.filterwarnings('ignore', category=FutureWarning) |
| warnings.filterwarnings('ignore', category=UserWarning) |
|
|
| |
| import logging |
| logging.getLogger('sklearn').setLevel(logging.ERROR) |
|
|
| def get_grass_color(img): |
| |
| hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) |
|
|
| |
| lower_green = np.array([30, 40, 40]) |
| upper_green = np.array([80, 255, 255]) |
|
|
| |
| mask = cv2.inRange(hsv, lower_green, upper_green) |
|
|
| |
| masked_img = cv2.bitwise_and(img, img, mask=mask) |
| grass_color = cv2.mean(img, mask=mask) |
| return grass_color[:3] |
|
|
| def get_players_boxes(frame, result): |
| players_imgs = [] |
| players_boxes = [] |
| for (box, score, cls) in result: |
| label = int(cls) |
| if label == 0: |
| x1, y1, x2, y2 = box.astype(int) |
| player_img = frame[y1: y2, x1: x2] |
| players_imgs.append(player_img) |
| players_boxes.append([box, score, cls]) |
| return players_imgs, players_boxes |
|
|
| def get_kits_colors(players, grass_hsv=None, frame=None): |
| kits_colors = [] |
| if grass_hsv is None: |
| grass_color = get_grass_color(frame) |
| grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV) |
|
|
| for player_img in players: |
| |
| if player_img is None or player_img.size == 0 or len(player_img.shape) != 3: |
| continue |
| |
| |
| hsv = cv2.cvtColor(player_img, cv2.COLOR_BGR2HSV) |
|
|
| |
| lower_green = np.array([grass_hsv[0, 0, 0] - 10, 40, 40]) |
| upper_green = np.array([grass_hsv[0, 0, 0] + 10, 255, 255]) |
|
|
| |
| mask = cv2.inRange(hsv, lower_green, upper_green) |
|
|
| |
| mask = cv2.bitwise_not(mask) |
| upper_mask = np.zeros(player_img.shape[:2], np.uint8) |
| upper_mask[0:player_img.shape[0]//2, 0:player_img.shape[1]] = 255 |
| mask = cv2.bitwise_and(mask, upper_mask) |
|
|
| kit_color = np.array(cv2.mean(player_img, mask=mask)[:3]) |
|
|
| kits_colors.append(kit_color) |
| return kits_colors |
|
|
| def get_kits_classifier(kits_colors): |
| if len(kits_colors) == 0: |
| return None |
| if len(kits_colors) == 1: |
| |
| return None |
| kits_kmeans = KMeans(n_clusters=2) |
| kits_kmeans.fit(kits_colors) |
| return kits_kmeans |
|
|
| def classify_kits(kits_classifer, kits_colors): |
| if kits_classifer is None or len(kits_colors) == 0: |
| return np.array([0]) |
| team = kits_classifer.predict(kits_colors) |
| return team |
|
|
| def get_left_team_label(players_boxes, kits_colors, kits_clf): |
| left_team_label = 0 |
| team_0 = [] |
| team_1 = [] |
|
|
| for i in range(len(players_boxes)): |
| x1, y1, x2, y2 = players_boxes[i][0].astype(int) |
| team = classify_kits(kits_clf, [kits_colors[i]]).item() |
| if team == 0: |
| team_0.append(np.array([x1])) |
| else: |
| team_1.append(np.array([x1])) |
|
|
| team_0 = np.array(team_0) |
| team_1 = np.array(team_1) |
|
|
| |
| avg_team_0 = np.average(team_0) if len(team_0) > 0 else 0 |
| avg_team_1 = np.average(team_1) if len(team_1) > 0 else 0 |
| |
| if avg_team_0 - avg_team_1 > 0: |
| left_team_label = 1 |
|
|
| return left_team_label |
|
|
| def check_box_boundaries(boxes, img_height, img_width): |
| """ |
| Check if bounding boxes are within image boundaries and clip them if necessary. |
| |
| Args: |
| boxes: numpy array of shape (N, 4) with [x1, y1, x2, y2] format |
| img_height: height of the image |
| img_width: width of the image |
| |
| Returns: |
| valid_boxes: numpy array of valid boxes within boundaries |
| valid_indices: indices of valid boxes |
| """ |
| x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] |
| |
| |
| valid_mask = (x1 >= 0) & (y1 >= 0) & (x2 < img_width) & (y2 < img_height) & (x1 < x2) & (y1 < y2) |
| |
| if not np.any(valid_mask): |
| return np.array([]), np.array([]) |
| |
| valid_boxes = boxes[valid_mask] |
| valid_indices = np.where(valid_mask)[0] |
| |
| |
| valid_boxes[:, 0] = np.clip(valid_boxes[:, 0], 0, img_width - 1) |
| valid_boxes[:, 1] = np.clip(valid_boxes[:, 1], 0, img_height - 1) |
| valid_boxes[:, 2] = np.clip(valid_boxes[:, 2], 0, img_width - 1) |
| valid_boxes[:, 3] = np.clip(valid_boxes[:, 3], 0, img_height - 1) |
| |
| return valid_boxes, valid_indices |
|
|
| def process_team_identification_batch(frames, results, kits_clf, left_team_label, grass_hsv): |
| """ |
| Process team identification and label formatting for batch results. |
| |
| Args: |
| frames: list of frames |
| results: list of detection results for each frame |
| kits_clf: trained kit classifier |
| left_team_label: label for left team |
| grass_hsv: grass color in HSV format |
| |
| Returns: |
| processed_results: list of processed results with team identification |
| """ |
| processed_results = [] |
| |
| for frame_idx, frame in enumerate(frames): |
| frame_results = [] |
| frame_detections = results[frame_idx] |
| |
| if not frame_detections: |
| processed_results.append([]) |
| continue |
| |
| |
| players_imgs = [] |
| players_boxes = [] |
| player_indices = [] |
| |
| for idx, (box, score, cls) in enumerate(frame_detections): |
| label = int(cls) |
| if label == 0: |
| x1, y1, x2, y2 = box.astype(int) |
| |
| |
| if (x1 >= 0 and y1 >= 0 and x2 < frame.shape[1] and y2 < frame.shape[0] and x1 < x2 and y1 < y2): |
| player_img = frame[y1:y2, x1:x2] |
| if player_img.size > 0: |
| players_imgs.append(player_img) |
| players_boxes.append([box, score, cls]) |
| player_indices.append(idx) |
| |
| |
| player_team_map = {} |
| |
| |
| if players_imgs and kits_clf is not None: |
| kits_colors = get_kits_colors(players_imgs, grass_hsv) |
| teams = classify_kits(kits_clf, kits_colors) |
| |
| |
| for i, team in enumerate(teams): |
| player_team_map[player_indices[i]] = team.item() |
| |
| id = 0 |
| |
| for idx, (box, score, cls) in enumerate(frame_detections): |
| label = int(cls) |
| x1, y1, x2, y2 = box.astype(int) |
| |
| |
| valid_boxes, valid_indices = check_box_boundaries( |
| np.array([[x1, y1, x2, y2]]), frame.shape[0], frame.shape[1] |
| ) |
| |
| if len(valid_boxes) == 0: |
| continue |
| |
| x1, y1, x2, y2 = valid_boxes[0].astype(int) |
| |
| |
| if label == 0: |
| if players_imgs and kits_clf is not None and idx in player_team_map: |
| team = player_team_map[idx] |
| if team == left_team_label: |
| final_label = 6 |
| else: |
| final_label = 7 |
| else: |
| final_label = 6 |
| |
| elif label == 1: |
| final_label = 1 |
| |
| elif label == 2: |
| final_label = 0 |
| |
| elif label == 3 or label == 4: |
| final_label = 3 |
| |
| else: |
| final_label = int(label) |
| |
| frame_results.append({ |
| "id": int(id), |
| "bbox": [int(x1), int(y1), int(x2), int(y2)], |
| "class_id": int(final_label), |
| "conf": float(score) |
| }) |
| id = id + 1 |
| |
| processed_results.append(frame_results) |
| |
| return processed_results |
|
|
| def convert_numpy_types(obj): |
| """Convert numpy types to native Python types for JSON serialization.""" |
| if isinstance(obj, np.integer): |
| return int(obj) |
| elif isinstance(obj, np.floating): |
| return float(obj) |
| elif isinstance(obj, np.ndarray): |
| return obj.tolist() |
| elif isinstance(obj, dict): |
| return {key: convert_numpy_types(value) for key, value in obj.items()} |
| elif isinstance(obj, list): |
| return [convert_numpy_types(item) for item in obj] |
| else: |
| return obj |
|
|
| def pre_process_img(frames, scale): |
| imgs = np.stack([cv2.resize(frame, (int(scale), int(scale))) for frame in frames]) |
| imgs = imgs.transpose(0, 3, 1, 2) |
| imgs = imgs.astype(np.float32) / 255.0 |
| return imgs |
|
|
| def post_process_output(outputs, x_scale, y_scale, conf_thresh=0.6, nms_thresh=0.75): |
| B, C, N = outputs.shape |
| outputs = torch.from_numpy(outputs) |
| outputs = outputs.permute(0, 2, 1) |
| boxes = outputs[..., :4] |
| class_scores = 1 / (1 + torch.exp(-outputs[..., 4:])) |
| conf, class_id = class_scores.max(dim=2) |
|
|
| mask = conf > conf_thresh |
| |
| for i in range(class_id.shape[0]): |
| |
| ball_idx = np.where(class_id[i] == 2)[0] |
| if ball_idx.size > 0: |
| |
| top = ball_idx[np.argmax(conf[i, ball_idx])] |
| if conf[i, top] > 0.55: |
| mask[i, top] = True |
| |
| |
| |
| |
| batch_idx, pred_idx = mask.nonzero(as_tuple=True) |
|
|
| if len(batch_idx) == 0: |
| return [[] for _ in range(B)] |
| |
| boxes = boxes[batch_idx, pred_idx] |
| conf = conf[batch_idx, pred_idx] |
| class_id = class_id[batch_idx, pred_idx] |
|
|
| x, y, w, h = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] |
| x1 = (x - w / 2) * x_scale |
| y1 = (y - h / 2) * y_scale |
| x2 = (x + w / 2) * x_scale |
| y2 = (y + h / 2) * y_scale |
| boxes_xyxy = torch.stack([x1, y1, x2, y2], dim=1) |
|
|
| max_coord = 1e4 |
| offset = batch_idx.to(boxes_xyxy) * max_coord |
| boxes_for_nms = boxes_xyxy + offset[:, None] |
|
|
| keep = batched_nms(boxes_for_nms, conf, batch_idx, nms_thresh) |
|
|
| boxes_final = boxes_xyxy[keep] |
| conf_final = conf[keep] |
| class_final = class_id[keep] |
| batch_final = batch_idx[keep] |
|
|
| results = [[] for _ in range(B)] |
| for b in range(B): |
| mask_b = batch_final == b |
| if mask_b.sum() == 0: |
| continue |
| results[b] = list(zip(boxes_final[mask_b].numpy(), |
| conf_final[mask_b].numpy(), |
| class_final[mask_b].numpy())) |
| return results |
|
|
| def player_detection_result(frames: list[ndarray], batch_size, model, kits_clf=None, left_team_label=None, grass_hsv=None): |
| start_time = time.time() |
| |
| |
| height, width = frames[0].shape[:2] |
| scale = 640.0 |
| x_scale = width / scale |
| y_scale = height / scale |
|
|
| |
| |
| infer_time = time.time() |
| kits_clf = kits_clf |
| left_team_label = left_team_label |
| grass_hsv = grass_hsv |
| results = [] |
| for i in range(0, len(frames), batch_size): |
| if i + batch_size > len(frames): |
| batch_size = len(frames) - i |
| batch_frames = frames[i:i + batch_size] |
| imgs = pre_process_img(batch_frames, scale) |
|
|
| input_name = model.get_inputs()[0].name |
| outputs = model.run(None, {input_name: imgs})[0] |
| raw_results = post_process_output(np.array(outputs), x_scale, y_scale) |
|
|
| if kits_clf is None or left_team_label is None or grass_hsv is None: |
| |
| first_frame = batch_frames[0] |
| first_frame_results = raw_results[0] if raw_results else [] |
| |
| if first_frame_results: |
| players_imgs, players_boxes = get_players_boxes(first_frame, first_frame_results) |
| if players_imgs: |
| grass_color = get_grass_color(first_frame) |
| grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV) |
| kits_colors = get_kits_colors(players_imgs, grass_hsv) |
| if kits_colors: |
| kits_clf = get_kits_classifier(kits_colors) |
| if kits_clf is not None: |
| left_team_label = int(get_left_team_label(players_boxes, kits_colors, kits_clf)) |
|
|
| |
| processed_results = process_team_identification_batch( |
| batch_frames, raw_results, kits_clf, left_team_label, grass_hsv |
| ) |
| |
| processed_results = convert_numpy_types(processed_results) |
| results.extend(processed_results) |
|
|
| |
| return results, kits_clf, left_team_label, grass_hsv |