File size: 14,212 Bytes
acf7a04
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
import cv2
import numpy as np
from sklearn.cluster import KMeans
import warnings
import time

import torch
from torchvision.ops import batched_nms
from numpy import ndarray
# Suppress ALL runtime and sklearn warnings
warnings.filterwarnings('ignore', category=RuntimeWarning)
warnings.filterwarnings('ignore', category=FutureWarning) 
warnings.filterwarnings('ignore', category=UserWarning)

# Suppress sklearn warnings specifically
import logging
logging.getLogger('sklearn').setLevel(logging.ERROR)

def get_grass_color(img):
    # Convert image to HSV color space
    hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)

    # Define range of green color in HSV
    lower_green = np.array([30, 40, 40])
    upper_green = np.array([80, 255, 255])

    # Threshold the HSV image to get only green colors
    mask = cv2.inRange(hsv, lower_green, upper_green)

    # Calculate the mean value of the pixels that are not masked
    masked_img = cv2.bitwise_and(img, img, mask=mask)
    grass_color = cv2.mean(img, mask=mask)
    return grass_color[:3]

def get_players_boxes(frame, result):
    players_imgs = []
    players_boxes = []
    for (box, score, cls) in result:
        label = int(cls)
        if label == 0:
            x1, y1, x2, y2 = box.astype(int)
            player_img = frame[y1: y2, x1: x2]
            players_imgs.append(player_img)
            players_boxes.append([box, score, cls])
    return players_imgs, players_boxes

def get_kits_colors(players, grass_hsv=None, frame=None):
    kits_colors = []
    if grass_hsv is None:
        grass_color = get_grass_color(frame)
        grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV)

    for player_img in players:
        # Skip empty or invalid images
        if player_img is None or player_img.size == 0 or len(player_img.shape) != 3:
            continue
            
        # Convert image to HSV color space
        hsv = cv2.cvtColor(player_img, cv2.COLOR_BGR2HSV)

        # Define range of green color in HSV
        lower_green = np.array([grass_hsv[0, 0, 0] - 10, 40, 40])
        upper_green = np.array([grass_hsv[0, 0, 0] + 10, 255, 255])

        # Threshold the HSV image to get only green colors
        mask = cv2.inRange(hsv, lower_green, upper_green)

        # Bitwise-AND mask and original image
        mask = cv2.bitwise_not(mask)
        upper_mask = np.zeros(player_img.shape[:2], np.uint8)
        upper_mask[0:player_img.shape[0]//2, 0:player_img.shape[1]] = 255
        mask = cv2.bitwise_and(mask, upper_mask)

        kit_color = np.array(cv2.mean(player_img, mask=mask)[:3])

        kits_colors.append(kit_color)
    return kits_colors

def get_kits_classifier(kits_colors):
    if len(kits_colors) == 0:
        return None
    if len(kits_colors) == 1:
        # Only one kit color, create a dummy classifier
        return None
    kits_kmeans = KMeans(n_clusters=2)
    kits_kmeans.fit(kits_colors)
    return kits_kmeans

def classify_kits(kits_classifer, kits_colors):
    if kits_classifer is None or len(kits_colors) == 0:
        return np.array([0])  # Default to team 0
    team = kits_classifer.predict(kits_colors)
    return team

def get_left_team_label(players_boxes, kits_colors, kits_clf):
    left_team_label = 0
    team_0 = []
    team_1 = []

    for i in range(len(players_boxes)):
        x1, y1, x2, y2 = players_boxes[i][0].astype(int)
        team = classify_kits(kits_clf, [kits_colors[i]]).item()
        if team == 0:
            team_0.append(np.array([x1]))
        else:
            team_1.append(np.array([x1]))

    team_0 = np.array(team_0)
    team_1 = np.array(team_1)

    # Safely calculate averages with fallback for empty arrays
    avg_team_0 = np.average(team_0) if len(team_0) > 0 else 0
    avg_team_1 = np.average(team_1) if len(team_1) > 0 else 0
    
    if avg_team_0 - avg_team_1 > 0:
        left_team_label = 1

    return left_team_label

def check_box_boundaries(boxes, img_height, img_width):
    """
    Check if bounding boxes are within image boundaries and clip them if necessary.
    
    Args:
        boxes: numpy array of shape (N, 4) with [x1, y1, x2, y2] format
        img_height: height of the image
        img_width: width of the image
    
    Returns:
        valid_boxes: numpy array of valid boxes within boundaries
        valid_indices: indices of valid boxes
    """
    x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3]
    
    # Check if boxes are within boundaries
    valid_mask = (x1 >= 0) & (y1 >= 0) & (x2 < img_width) & (y2 < img_height) & (x1 < x2) & (y1 < y2)
    
    if not np.any(valid_mask):
        return np.array([]), np.array([])
    
    valid_boxes = boxes[valid_mask]
    valid_indices = np.where(valid_mask)[0]
    
    # Clip boxes to image boundaries
    valid_boxes[:, 0] = np.clip(valid_boxes[:, 0], 0, img_width - 1)   # x1
    valid_boxes[:, 1] = np.clip(valid_boxes[:, 1], 0, img_height - 1)  # y1
    valid_boxes[:, 2] = np.clip(valid_boxes[:, 2], 0, img_width - 1)   # x2
    valid_boxes[:, 3] = np.clip(valid_boxes[:, 3], 0, img_height - 1)  # y2
    
    return valid_boxes, valid_indices

def process_team_identification_batch(frames, results, kits_clf, left_team_label, grass_hsv):
    """
    Process team identification and label formatting for batch results.
    
    Args:
        frames: list of frames
        results: list of detection results for each frame
        kits_clf: trained kit classifier
        left_team_label: label for left team
        grass_hsv: grass color in HSV format
    
    Returns:
        processed_results: list of processed results with team identification
    """
    processed_results = []
    
    for frame_idx, frame in enumerate(frames):
        frame_results = []
        frame_detections = results[frame_idx]
        
        if not frame_detections:
            processed_results.append([])
            continue
            
        # Extract player boxes and images
        players_imgs = []
        players_boxes = []
        player_indices = []
        
        for idx, (box, score, cls) in enumerate(frame_detections):
            label = int(cls)
            if label == 0:  # Player detection
                x1, y1, x2, y2 = box.astype(int)
                
                # Check boundaries
                if (x1 >= 0 and y1 >= 0 and x2 < frame.shape[1] and y2 < frame.shape[0] and x1 < x2 and y1 < y2):
                    player_img = frame[y1:y2, x1:x2]
                    if player_img.size > 0:  # Ensure valid image
                        players_imgs.append(player_img)
                        players_boxes.append([box, score, cls])
                        player_indices.append(idx)
        
        # Initialize player team mapping
        player_team_map = {}
        
        # Process team identification if we have players
        if players_imgs and kits_clf is not None:
            kits_colors = get_kits_colors(players_imgs, grass_hsv)
            teams = classify_kits(kits_clf, kits_colors)
            
            # Create mapping from player index to team
            for i, team in enumerate(teams):
                player_team_map[player_indices[i]] = team.item()
        
        id = 0
        # Process all detections with team identification
        for idx, (box, score, cls) in enumerate(frame_detections):
            label = int(cls)
            x1, y1, x2, y2 = box.astype(int)
            
            # Check boundaries
            valid_boxes, valid_indices = check_box_boundaries(
                np.array([[x1, y1, x2, y2]]), frame.shape[0], frame.shape[1]
            )
            
            if len(valid_boxes) == 0:
                continue
                
            x1, y1, x2, y2 = valid_boxes[0].astype(int)
            
            # Apply team identification logic
            if label == 0:  # Player
                if players_imgs and kits_clf is not None and idx in player_team_map:
                    team = player_team_map[idx]
                    if team == left_team_label:
                        final_label = 6  # Player-L (Left team)
                    else:
                        final_label = 7  # Player-R (Right team)
                else:
                    final_label = 6  # Default player label
                    
            elif label == 1:  # Goalkeeper
                final_label = 1  # GK
                
            elif label == 2:  # Ball
                final_label = 0  # Ball
                
            elif label == 3 or label == 4:  # Referee or other
                final_label = 3  # Referee
                
            else:
                final_label = int(label)  # Keep original label, ensure it's int
            
            frame_results.append({
                "id": int(id),
                "bbox": [int(x1), int(y1), int(x2), int(y2)],
                "class_id": int(final_label),
                "conf": float(score)
            })
            id = id + 1
        
        processed_results.append(frame_results)
    
    return processed_results

def convert_numpy_types(obj):
    """Convert numpy types to native Python types for JSON serialization."""
    if isinstance(obj, np.integer):
        return int(obj)
    elif isinstance(obj, np.floating):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, dict):
        return {key: convert_numpy_types(value) for key, value in obj.items()}
    elif isinstance(obj, list):
        return [convert_numpy_types(item) for item in obj]
    else:
        return obj

def pre_process_img(frames, scale):
    imgs = np.stack([cv2.resize(frame, (int(scale), int(scale))) for frame in frames])
    imgs = imgs.transpose(0, 3, 1, 2)
    imgs = imgs.astype(np.float32) / 255.0  # Normalize
    return imgs

def post_process_output(outputs, x_scale, y_scale, conf_thresh=0.6, nms_thresh=0.75):
    B, C, N = outputs.shape
    outputs = torch.from_numpy(outputs)
    outputs = outputs.permute(0, 2, 1)
    boxes = outputs[..., :4]
    class_scores = 1 / (1 + torch.exp(-outputs[..., 4:]))
    conf, class_id = class_scores.max(dim=2)

    mask = conf > conf_thresh
    
    for i in range(class_id.shape[0]):  # loop over batch
    # Find detections that are balls
        ball_idx = np.where(class_id[i] == 2)[0]
        if ball_idx.size > 0:
            # Pick the one with the highest confidence
            top = ball_idx[np.argmax(conf[i, ball_idx])]
            if conf[i, top] > 0.55:  # apply confidence threshold
                mask[i, top] = True
                
    # ball_mask = (class_id == 2) & (conf > 0.51)
    # mask = mask | ball_mask
    
    batch_idx, pred_idx = mask.nonzero(as_tuple=True)

    if len(batch_idx) == 0:
        return [[] for _ in range(B)]
    
    boxes = boxes[batch_idx, pred_idx]
    conf = conf[batch_idx, pred_idx]
    class_id = class_id[batch_idx, pred_idx]

    x, y, w, h = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3]
    x1 = (x - w / 2) * x_scale
    y1 = (y - h / 2) * y_scale
    x2 = (x + w / 2) * x_scale
    y2 = (y + h / 2) * y_scale
    boxes_xyxy = torch.stack([x1, y1, x2, y2], dim=1)

    max_coord = 1e4
    offset = batch_idx.to(boxes_xyxy) * max_coord
    boxes_for_nms = boxes_xyxy + offset[:, None]

    keep = batched_nms(boxes_for_nms, conf, batch_idx, nms_thresh)

    boxes_final = boxes_xyxy[keep]
    conf_final = conf[keep]
    class_final = class_id[keep]
    batch_final = batch_idx[keep]

    results = [[] for _ in range(B)]
    for b in range(B):
        mask_b = batch_final == b
        if mask_b.sum() == 0:
            continue
        results[b] = list(zip(boxes_final[mask_b].numpy(),
                              conf_final[mask_b].numpy(),
                              class_final[mask_b].numpy()))
    return results

def player_detection_result(frames: list[ndarray], batch_size, model, kits_clf=None, left_team_label=None, grass_hsv=None):
    start_time = time.time()
    # input_layer = model.input(0)
    # output_layer = model.output(0)
    height, width = frames[0].shape[:2]
    scale = 640.0
    x_scale = width / scale
    y_scale = height / scale

    # infer_queue = AsyncInferQueue(model, len(frames))
    
    infer_time = time.time()
    kits_clf = kits_clf
    left_team_label = left_team_label
    grass_hsv = grass_hsv
    results = []
    for i in range(0, len(frames), batch_size):
        if i + batch_size > len(frames):
            batch_size = len(frames) - i
        batch_frames = frames[i:i + batch_size]
        imgs = pre_process_img(batch_frames, scale)

        input_name = model.get_inputs()[0].name
        outputs = model.run(None, {input_name: imgs})[0]
        raw_results = post_process_output(np.array(outputs), x_scale, y_scale)

        if kits_clf is None or left_team_label is None or grass_hsv is None:
            # Use first frame to initialize team classification
            first_frame = batch_frames[0]
            first_frame_results = raw_results[0] if raw_results else []
            
            if first_frame_results:
                players_imgs, players_boxes = get_players_boxes(first_frame, first_frame_results)
                if players_imgs:
                    grass_color = get_grass_color(first_frame)
                    grass_hsv = cv2.cvtColor(np.uint8([[list(grass_color)]]), cv2.COLOR_BGR2HSV)
                    kits_colors = get_kits_colors(players_imgs, grass_hsv)
                    if kits_colors:  # Only proceed if we have valid kit colors
                        kits_clf = get_kits_classifier(kits_colors)
                        if kits_clf is not None:
                            left_team_label = int(get_left_team_label(players_boxes, kits_colors, kits_clf))

        # Process team identification and boundary checking
        processed_results = process_team_identification_batch(
            batch_frames, raw_results, kits_clf, left_team_label, grass_hsv
        )
    
        processed_results = convert_numpy_types(processed_results)
        results.extend(processed_results)

    # Return the same format as before for compatibility
    return results, kits_clf, left_team_label, grass_hsv