import cv2 import math import copy import json import numpy as np import supervision as sv from ultralytics import YOLO import plotly.graph_objects as go from collections import defaultdict, deque from ViTPose.inference import VitInference from scipy.optimize import linear_sum_assignment def prepare_cameras(calibration_path): data = np.load(calibration_path) cam_ids = sorted({int(k[2:]) for k in data.files if k.startswith("Pn")}) Pn_dict, K_dict, dist_dict = {}, {}, {} for cid in cam_ids: p_key = f"Pn{cid}" k_key = f"K{cid}" d_key = f"dist{cid}" if p_key in data.files and k_key in data.files and d_key in data.files: Pn_dict[cid] = data[p_key] K_dict[cid] = data[k_key] dist_dict[cid] = data[d_key] if not Pn_dict: raise ValueError(f"No valid camera params found in: {calibration_path}") return Pn_dict, K_dict, dist_dict def rescale_points(persons, balls, current_size, target_size): sx = target_size[0] / current_size[0] sy = target_size[1] / current_size[1] persons = {pid: {idx: (x * sx, y * sy) for idx, (x, y) in skel.items()} for pid, skel in persons.items()} balls = {bid: (x1 * sx, y1 * sy, x2 * sx, y2 * sy) for bid, (x1, y1, x2, y2) in balls.items()} return persons, balls def detect_entities(frame, result, pose_model, target_size, margin_ratio=0.35): def iou(a, b): x1 = max(a[0], b[0]) y1 = max(a[1], b[1]) x2 = min(a[2], b[2]) y2 = min(a[3], b[3]) inter = max(0, x2 - x1) * max(0, y2 - y1) area_a = max(0, a[2] - a[0]) * max(0, a[3] - a[1]) area_b = max(0, b[2] - b[0]) * max(0, b[3] - b[1]) union = area_a + area_b - inter return inter / union if union > 0 else 0.0 def remove_overlaps(boxes, ids, confs, thr=0.65): keep = [] for i in np.argsort(-confs): if all(iou(boxes[i], boxes[j]) <= thr for j in keep): keep.append(i) return boxes[keep], [ids[i] for i in keep], confs[keep] persons = {} balls = {} fh, fw = frame.shape[:2] current_size = (fw, fh) person_boxes = np.empty((0, 4), dtype=np.float32) person_ids = [] person_confs = np.empty((0,), dtype=np.float32) if result.boxes is not None and result.boxes.id is not None and len(result.boxes) > 0: boxes = result.boxes.xyxy.cpu().numpy() ids = result.boxes.id.int().cpu().tolist() confs = result.boxes.conf.cpu().numpy() class_ids = result.boxes.cls.int().cpu().tolist() mask = np.array([(c >= 0.5) if cls == 0 else (c >= 0.25) for c, cls in zip(confs, class_ids)]) boxes = boxes[mask] confs = confs[mask] ids = [track_id for track_id, keep in zip(ids, mask) if keep] class_ids = [class_id for class_id, keep in zip(class_ids, mask) if keep] for box, track_id, score, class_id in zip(boxes, ids, confs, class_ids): x1, y1, x2, y2 = box track_id = int(track_id) class_id = int(class_id) if class_id == 0: w = x2 - x1 h = y2 - y1 x1e = max(0, x1 - margin_ratio * w) y1e = max(0, y1 - margin_ratio * h) x2e = min(fw, x2 + margin_ratio * w) y2e = min(fh, y2 + margin_ratio * h) person_boxes = np.vstack([person_boxes, np.array([x1e, y1e, x2e, y2e])[None, :]]) person_ids.append(track_id) person_confs = np.append(person_confs, score) elif class_id == 32: balls[track_id] = tuple(map(int, box)) if len(person_boxes): person_boxes, person_ids, person_confs = remove_overlaps(person_boxes, person_ids, person_confs, 0.75) frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) poses = pose_model.inference_from_bboxes(frame_rgb, person_boxes, ids=person_ids, scores=person_confs.tolist()) for person_id, keypoints in poses.items(): skel = {} for i, point in enumerate(keypoints): y, x, score = point skel[i] = (float(x), float(y)) persons[int(person_id)] = skel persons, balls = rescale_points(persons, balls, current_size, target_size) return persons, balls def correspond(persons_2d, balls_2d, Pn_dict, K_dict, dist_dict, threshold_px=75): MATCH_KPTS = list(range(25)) INF = 1e9 def calc_F(Pni, Pnj, Ki, Kj): R1, t1 = Pni[:, :3], Pni[:, 3].reshape(3, 1) R2, t2 = Pnj[:, :3], Pnj[:, 3].reshape(3, 1) R21 = R2 @ R1.T t21 = t2 - R21 @ t1 tx = np.array([ [0, -t21[2, 0], t21[1, 0]], [t21[2, 0], 0, -t21[0, 0]], [-t21[1, 0], t21[0, 0], 0 ] ]) E = tx @ R21 F = np.linalg.inv(Kj).T @ E @ np.linalg.inv(Ki) U, S, Vt = np.linalg.svd(F) S[2] = 0.0 F = U @ np.diag(S) @ Vt return F / (F[2, 2] + 1e-12) def undistort_pt(pt, K, dist): pt_arr = np.array([[pt]], dtype=np.float64) pt_ud = cv2.undistortPoints(pt_arr, K, dist, P=K) return np.array([pt_ud[0, 0, 0], pt_ud[0, 0, 1], 1.0]) def sampson_dist(pti, ptj, Ki, Kj, disti, distj, F): x1 = undistort_pt(pti, Ki, disti) x2 = undistort_pt(ptj, Kj, distj) Fx1 = F @ x1 Ftx2 = F.T @ x2 num = (x2 @ Fx1) ** 2 den = Fx1[0]**2 + Fx1[1]**2 + Ftx2[0]**2 + Ftx2[1]**2 + 1e-12 return float(np.sqrt(abs(num / den))) def pair_cost_persons(ref_skl, cand_skl, Ki, Kj, di, dj, F, min_kpts=3): dists = [] for k in MATCH_KPTS: if k in ref_skl and k in cand_skl: dists.append(sampson_dist(ref_skl[k], cand_skl[k], Ki, Kj, di, dj, F)) if len(dists) < min_kpts: return INF dists = np.array(dists, dtype=float) if len(dists) >= 4: med = np.median(dists) mad = np.median(np.abs(dists - med)) if mad > 1e-6: dists = dists[np.abs(dists - med) <= 2.5 * mad] if len(dists) < min_kpts: return INF return float(np.median(dists)) def pair_cost_ball(rc, cc, Ki, Kj, di, dj, F): return sampson_dist(rc, cc, Ki, Kj, di, dj, F) def hungarian_match(cost_matrix, row_ids, col_ids): if cost_matrix.size == 0: return {} row_ind, col_ind = linear_sum_assignment(cost_matrix) matches = {} for r, c in zip(row_ind, col_ind): if cost_matrix[r, c] < threshold_px: matches[row_ids[r]] = col_ids[c] return matches def apply_symmetry_gate(matches, cost_matrix, rid_idx, cids): accepted = {} for rid, cid in matches.items(): ri = rid_idx[rid] ci = cids.index(cid) forward_cost = cost_matrix[ri] valid = forward_cost[forward_cost < INF] if len(valid) >= 2: sorted_costs = np.sort(valid) if sorted_costs[1] < 1.3 * sorted_costs[0] and cost_matrix[ri, ci] > threshold_px * 0.4: continue accepted[rid] = cid return accepted def avg_cost(acc, cnt): with np.errstate(invalid='ignore', divide='ignore'): return np.where(cnt > 0, acc / cnt, INF) def centre(b): return ((b[0] + b[2]) / 2, (b[1] + b[3]) / 2) cam_ids = sorted(persons_2d.keys()) cam0 = cam_ids[0] next_person_id = max(persons_2d[cam0].keys(), default=-1) + 1 next_ball_id = max(balls_2d[cam0].keys(), default=-1) + 1 for idx in range(1, len(cam_ids)): cam_j = cam_ids[idx] curr_persons = persons_2d[cam_j] curr_balls = balls_2d[cam_j] p_rids = sorted(set(rid for ci in cam_ids[:idx] for rid in persons_2d[ci])) p_cids = list(curr_persons.keys()) b_rids = sorted(set(rid for ci in cam_ids[:idx] for rid in balls_2d[ci])) b_cids = list(curr_balls.keys()) P_acc = np.zeros((len(p_rids), len(p_cids))) if (p_rids and p_cids) else None P_cnt = np.zeros_like(P_acc) if P_acc is not None else None B_acc = np.zeros((len(b_rids), len(b_cids))) if (b_rids and b_cids) else None B_cnt = np.zeros_like(B_acc) if B_acc is not None else None p_rid_idx = {rid: i for i, rid in enumerate(p_rids)} b_rid_idx = {rid: i for i, rid in enumerate(b_rids)} for cam_i in cam_ids[:idx]: F = calc_F(Pn_dict[cam_i], Pn_dict[cam_j], K_dict[cam_i], K_dict[cam_j]) Ki, Kj = K_dict[cam_i], K_dict[cam_j] di, dj = dist_dict[cam_i], dist_dict[cam_j] if P_acc is not None: for rid, rskl in persons_2d[cam_i].items(): ri = p_rid_idx[rid] for ci, cid in enumerate(p_cids): c = pair_cost_persons(rskl, curr_persons[cid], Ki, Kj, di, dj, F) if c < INF: P_acc[ri, ci] += c P_cnt[ri, ci] += 1 if B_acc is not None: for rid, rb in balls_2d[cam_i].items(): ri = b_rid_idx[rid] rc = centre(rb) for ci, cid in enumerate(b_cids): c = pair_cost_ball(rc, centre(curr_balls[cid]), Ki, Kj, di, dj, F) if c < INF: B_acc[ri, ci] += c B_cnt[ri, ci] += 1 ordered_persons = {} used_p = set() if P_acc is not None: P_cost = avg_cost(P_acc, P_cnt) for rid, cid in apply_symmetry_gate( hungarian_match(P_cost, p_rids, p_cids), P_cost, p_rid_idx, p_cids ).items(): ordered_persons[rid] = curr_persons[cid] used_p.add(cid) for cid in p_cids: if cid not in used_p: ordered_persons[next_person_id] = curr_persons[cid] next_person_id += 1 ordered_balls = {} used_b = set() if B_acc is not None: B_cost = avg_cost(B_acc, B_cnt) for rid, cid in apply_symmetry_gate( hungarian_match(B_cost, b_rids, b_cids), B_cost, b_rid_idx, b_cids ).items(): ordered_balls[rid] = curr_balls[cid] used_b.add(cid) for cid in b_cids: if cid not in used_b: ordered_balls[next_ball_id] = curr_balls[cid] next_ball_id += 1 persons_2d[cam_j] = ordered_persons balls_2d[cam_j] = ordered_balls return persons_2d, balls_2d def stereo(persons_2d, balls_2d, Pn_dict, K_dict, dist_dict): def triangulate(points_2d, projection_matrices): A = [] for i in range(len(points_2d)): u, v = points_2d[i] P = projection_matrices[i] A.append(u * P[2, :] - P[0, :]) A.append(v * P[2, :] - P[1, :]) A = np.array(A) _, _, Vh = np.linalg.svd(A) X_h = Vh[-1, :] return (X_h[:3] / X_h[3]).flatten() def undistort_point(pt, K, dist): pt_arr = np.array(pt, dtype=np.float32).reshape(1, 1, 2) pt_ud = cv2.undistortPoints(pt_arr, K, dist, P=K) return pt_ud[0, 0] persons_3d = {} all_person_ids = set() for cam_persons in persons_2d.values(): all_person_ids.update(cam_persons.keys()) for pid in all_person_ids: person_cams = {cid: skel for cid, c_pers in persons_2d.items() if pid in c_pers for skel in [c_pers[pid]]} if len(person_cams) < 2: continue all_kpt_indices = set() for skel in person_cams.values(): all_kpt_indices.update(skel.keys()) person_3d = {} for kpt_idx in all_kpt_indices: pts_to_triangulate = [] pns_to_triangulate = [] for cid, skel in person_cams.items(): if kpt_idx in skel: ud_pt = undistort_point(skel[kpt_idx], K_dict[cid], dist_dict[cid]) pts_to_triangulate.append(ud_pt) P = K_dict[cid] @ Pn_dict[cid] pns_to_triangulate.append(P) if len(pts_to_triangulate) >= 2: person_3d[kpt_idx] = triangulate(pts_to_triangulate, pns_to_triangulate) if person_3d: persons_3d[pid] = person_3d balls_3d = {} all_ball_ids = set() for cam_balls in balls_2d.values(): all_ball_ids.update(cam_balls.keys()) for bid in all_ball_ids: pts_to_triangulate = [] pns_to_triangulate = [] for cid, c_balls in balls_2d.items(): if bid in c_balls: bbox = c_balls[bid] center = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2) ud_pt = undistort_point(center, K_dict[cid], dist_dict[cid]) pts_to_triangulate.append(ud_pt) P = K_dict[cid] @ Pn_dict[cid] pns_to_triangulate.append(P) if len(pts_to_triangulate) >= 2: balls_3d[bid] = triangulate(pts_to_triangulate, pns_to_triangulate) return persons_3d, balls_3d def load_action_intervals(json_path, fps): timecode_fps = max(1, int(round(float(fps)))) def timestamp_to_frame(timestamp, action_number, field_name): if not isinstance(timestamp, str): raise ValueError(f"Invalid time format in action {action_number}. {field_name} must use HH:MM:SS:FF.") parts = timestamp.strip().split(":") if len(parts) != 4: raise ValueError(f'Invalid time format "{timestamp}" in action {action_number}. Use HH:MM:SS:FF.') try: h, m, s, f = [int(part) for part in parts] except ValueError: raise ValueError(f'Invalid time format "{timestamp}" in action {action_number}. Use integer timecode values.') if h < 0 or m < 0 or m > 59 or s < 0 or s > 59 or f < 0: raise ValueError(f'Invalid time format "{timestamp}" in action {action_number}. Minutes and seconds must be between 00 and 59.') if f >= timecode_fps: raise ValueError( f'Invalid time format "{timestamp}" in action {action_number}. ' f'At {timecode_fps}fps, frame values must be between 00 and {timecode_fps - 1:02d}.' ) return int((h * 3600 + m * 60 + s) * timecode_fps + f) with open(json_path, "r", encoding="utf-8") as f: data = json.load(f) raw_actions = data.get("actions") if isinstance(data, dict) else None if not isinstance(raw_actions, list) or not raw_actions: raise ValueError("No actions array found.") actions = [] for index, item in enumerate(raw_actions): action_number = index + 1 if not isinstance(item, dict): raise ValueError(f"Action {action_number} must be an object.") label = item.get("label") if not isinstance(label, str) or not label.strip(): raise ValueError(f"Action {action_number} must include a label.") start_frame = timestamp_to_frame(item.get("start"), action_number, "start") end_frame = timestamp_to_frame(item.get("end"), action_number, "end") if end_frame < start_frame: raise ValueError(f"Invalid action interval in action {action_number}. End must not be before start.") actions.append({"label": label.strip(), "start_frame": start_frame, "end_frame": end_frame}) return actions def process_action_interval(cameras, sizes, action, yolo_path, pose_model, Pn_dict, K_dict, dist_dict, connections, progress_cb=None, current_act=0, total_act=1): cameras_caps = {} cameras_persons = {} cameras_balls = {} cameras_models = {} for camera in cameras.keys(): cap = cv2.VideoCapture(cameras[camera]) cameras_caps[camera] = cap cameras_persons[camera] = None cameras_balls[camera] = None cameras_models[camera] = YOLO(yolo_path) first_cap = next(iter(cameras_caps.values())) infer_size = (int(first_cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(first_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) target_size = sizes['TARGET_SIZE'] yolo_imgsz = sizes['YOLO_IMGSZ'] start_frame = action["start_frame"] end_frame = action["end_frame"] for cap in cameras_caps.values(): cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) history = {} total_frames = end_frame - start_frame + 1 for step, frame_no in enumerate(range(start_frame, end_frame + 1)): if progress_cb: # Check if cancellation was requested should_continue = progress_cb(current_act, total_act, step, max(1, total_frames)) if should_continue is False: raise InterruptedError("Processing cancelled by user") temp_cameras_frames = {} cameras_frames = {} cameras_rets = {} for camera in cameras: ret, frame = cameras_caps[camera].read() temp_cameras_frames[camera] = frame cameras_rets[camera] = ret not_working_cameras = [camera for camera in cameras_rets if not cameras_rets[camera]] if (len(cameras) - len(not_working_cameras)) < 2: break for camera in cameras: if camera not in not_working_cameras: cameras_frames[camera] = temp_cameras_frames[camera] for camera in cameras_frames: result = cameras_models[camera].track(cameras_frames[camera], persist=True, verbose=False, imgsz=yolo_imgsz)[0] persons, balls = detect_entities(cameras_frames[camera], result, pose_model, target_size) cameras_persons[camera] = persons cameras_balls[camera] = balls cameras_persons, cameras_balls = correspond(cameras_persons, cameras_balls, Pn_dict, K_dict, dist_dict) persons_3d, balls_3d = stereo(cameras_persons, cameras_balls, Pn_dict, K_dict, dist_dict) history[frame_no] = { "persons_3d": copy.deepcopy(persons_3d), "persons_2d": copy.deepcopy(cameras_persons), "balls_3d": copy.deepcopy(balls_3d), "balls_2d": copy.deepcopy(cameras_balls), } for cap in cameras_caps.values(): cap.release() return history def proximity_target(history, middle): for i in range(middle-3, middle+4): frame_data = history.get(i) if frame_data is None: continue if (not frame_data['persons_3d']) or (not frame_data['balls_3d']): continue pairs = {} for ball_id, ball_center in frame_data['balls_3d'].items(): closest_person = None closest_distance = float('inf') for person_id, person_skel in frame_data['persons_3d'].items(): person_center = (np.array(person_skel[17]) + np.array(person_skel[18])) / 2 distance = math.dist(person_center, ball_center) if distance < closest_distance: closest_distance = distance closest_person = person_id if closest_person is not None: pairs[ball_id] = [closest_person, closest_distance] global_closest_ball = None global_closest_person = None global_closest_distance = float('inf') for ball_id, closest_details in pairs.items(): if closest_details[1] < global_closest_distance: global_closest_person = closest_details[0] global_closest_distance = closest_details[1] global_closest_ball = ball_id if global_closest_distance <= 150: return (global_closest_ball, global_closest_person, global_closest_distance) return None def analyze(window, target_player_id, action_type, zone_entry_ball_pos=None): def kpt(skel, i): v = skel.get(i) return np.array(v, float) if v is not None else None def dist(p1, p2): if p1 is None or p2 is None: return None return float(np.linalg.norm(np.array(p1, float) - np.array(p2, float))) def vec_angle(v1, v2): v1, v2 = np.array(v1, float), np.array(v2, float) n1, n2 = np.linalg.norm(v1), np.linalg.norm(v2) if n1 < 1e-9 or n2 < 1e-9: return None return round(float(np.degrees(np.arccos(np.clip(np.dot(v1/n1, v2/n2), -1.0, 1.0)))), 3) def angle3(a, b, c): if a is None or b is None or c is None: return None return vec_angle(np.array(a, float)-np.array(b, float), np.array(c, float)-np.array(b, float)) def find_ball(p3d, b3d, pid): if not b3d: return None, None skel = p3d.get(pid, {}) # ViTPose 25: l_ankle=17, r_ankle=18 la, ra = kpt(skel, 17), kpt(skel, 18) pivot = (la+ra)/2.0 if la is not None and ra is not None else None best_bid, best_pos, best_d = None, None, float('inf') for bid, bpos in b3d.items(): if bpos is None: continue bpos = np.array(bpos, float) d = float(np.linalg.norm(pivot - bpos)) if pivot is not None else 0.0 if d < best_d: best_d, best_bid, best_pos = d, bid, bpos return best_bid, best_pos def compute_features(p3d, b3d, pid): skel = p3d.get(pid) if skel is None: return None def g(i): return kpt(skel, i) # ViTPose 25 keypoint indices: # 0=nose, 1=l_eye, 2=r_eye, 3=l_ear, 4=r_ear # 5=neck # 6=l_shoulder, 7=r_shoulder # 8=l_elbow, 9=r_elbow # 10=l_wrist, 11=r_wrist # 12=l_hip, 13=r_hip # 15=l_knee, 16=r_knee # 17=l_ankle, 18=r_ankle # 19=l_big_toe, 20=l_small_toe, 21=l_heel # 22=r_big_toe, 23=r_small_toe, 24=r_heel nose, l_eye, r_eye = g(0), g(1), g(2) l_shoulder, r_shoulder = g(6), g(7) l_hip, r_hip = g(12), g(13) l_knee, r_knee = g(15), g(16) l_ankle, r_ankle = g(17), g(18) l_heel, r_heel = g(21), g(24) l_toe, r_toe = g(19), g(22) _, ball_center = find_ball(p3d, b3d, pid) l_knee_angle = angle3(l_hip, l_knee, l_ankle) r_knee_angle = angle3(r_hip, r_knee, r_ankle) head_angle = mid_shoulder = mid_hip = None if all(v is not None for v in (l_shoulder, r_shoulder, l_hip, r_hip)): mid_shoulder = (l_shoulder + r_shoulder) / 2.0 mid_hip = (l_hip + r_hip) / 2.0 if nose is not None: head_angle = angle3(nose, mid_shoulder, mid_hip) trunc_pitch_angle = trunc_roll_angle = head_pitch_angle = head_roll_angle = None if mid_shoulder is not None and mid_hip is not None: Y = np.array([0.0, 1.0, 0.0]) Hv = np.array([l_hip[0], mid_hip[1], l_hip[2]]) - mid_hip Hvn = np.linalg.norm(Hv) or 1.0 X_ = Hv / Hvn cross = np.cross(X_, Y) cn = np.linalg.norm(cross) if cn < 1e-9: cross, cn = np.array([0.0, 0.0, 1.0]), 1.0 Z_ = cross / cn R = np.column_stack((X_, Y, Z_)) def loc(pt): return R.T @ (pt - mid_hip) msl = loc(mid_shoulder) trunc_pitch_angle = angle3((0, msl[1], msl[2]), (0,0,0), (0,-1,0)) trunc_roll_angle = angle3((msl[0], msl[1], 0), (0,0,0), (0,-1,0)) if nose is not None and l_eye is not None and r_eye is not None: meil = loc((l_eye + r_eye) / 2.0) nloc = loc(nose) head_pitch_angle = angle3((0, msl[1], msl[2]), (0, meil[1], meil[2]), (0, nloc[1], nloc[2])) head_roll_angle = angle3((msl[0], msl[1], 0), (meil[0], meil[1], 0), (nloc[0], nloc[1], 0)) left_foot_orientation_angle = right_foot_orientation_angle = difference_in_angles = None if all(v is not None for v in (l_hip, r_hip, l_heel, r_heel, l_toe, r_toe)): body_dir = r_hip - l_hip body_fwd = np.array([-body_dir[2], 0.0, body_dir[0]]) bfwd_2d = body_fwd[[0, 2]] bn = np.linalg.norm(bfwd_2d) if bn > 1e-6: bfwd_2d /= bn for foot_heel, foot_toe, side in ((l_heel, l_toe, "left"), (r_heel, r_toe, "right")): fv = (foot_toe - foot_heel)[[0, 2]] fn_ = np.linalg.norm(fv) if fn_ > 1e-6: fv /= fn_ ang = float(np.degrees(np.arccos(np.clip(np.dot(fv, bfwd_2d), -1.0, 1.0)))) if side == "left": left_foot_orientation_angle = round(ang, 3) else: right_foot_orientation_angle = round(ang, 3) if left_foot_orientation_angle is not None and right_foot_orientation_angle is not None: difference_in_angles = abs(left_foot_orientation_angle - right_foot_orientation_angle) l_r_foot_distance = None if all(v is not None for v in (l_ankle, r_ankle, l_knee)): akl = dist(l_ankle, l_knee) if akl and akl > 0: l_r_foot_distance = round(dist(l_ankle, r_ankle), 3) l_foot_ball_distance = r_foot_ball_distance = None if ball_center is not None: if l_ankle is not None: l_foot_ball_distance = round(float(dist(l_ankle, ball_center)), 3) if r_ankle is not None: r_foot_ball_distance = round(float(dist(r_ankle, ball_center)), 3) return { "head_angle": head_angle, "l_knee_angle": l_knee_angle, "r_knee_angle": r_knee_angle, "trunc_pitch_angle": trunc_pitch_angle, "trunc_roll_angle": trunc_roll_angle, "head_pitch_angle": head_pitch_angle, "head_roll_angle": head_roll_angle, "left_foot_orientation_angle": left_foot_orientation_angle, "right_foot_orientation_angle": right_foot_orientation_angle, "difference_in_angles": difference_in_angles, "l_r_foot_distance": l_r_foot_distance, "l_foot_ball_distance": l_foot_ball_distance, "r_foot_ball_distance": r_foot_ball_distance, "ball_position": ball_center.tolist() if ball_center is not None else None, "l_ankle": l_ankle, "r_ankle": r_ankle, } def body_to_ball_angle(skel, ball_pos): if ball_pos is None: return None # ViTPose 25: l_shoulder=6, r_shoulder=7 l_shoulder, r_shoulder = kpt(skel, 6), kpt(skel, 7) if l_shoulder is None or r_shoulder is None: return None forward = r_shoulder - l_shoulder forward[1] = 0.0 # ViTPose 25: l_ankle=17, r_ankle=18 l_ankle, r_ankle = kpt(skel, 17), kpt(skel, 18) if l_ankle is None or r_ankle is None: # fallback to heels: l_heel=21, r_heel=24 l_ankle, r_ankle = kpt(skel, 21), kpt(skel, 24) feet_mid = (l_ankle + r_ankle) / 2.0 to_ball = ball_pos - feet_mid to_ball[1] = 0.0 return vec_angle(forward, to_ball) def body_vs_ball(skel, ball_travel_vec): # ViTPose 25: l_hip=12, r_hip=13 lh, rh = kpt(skel, 12), kpt(skel, 13) if lh is None or rh is None: return None body_dir = rh - lh forward = np.array([-body_dir[2], 0.0, body_dir[0]]) bv = np.array(ball_travel_vec, float).copy() bv[1] = 0.0 return vec_angle(forward, bv) def leg_separation_angle(skel, active_foot): # ViTPose 25: l_heel=21, r_heel=24, l_hip=12, r_hip=13 l_heel, r_heel = kpt(skel, 21), kpt(skel, 24) l_hip, r_hip = kpt(skel, 12), kpt(skel, 13) if any(x is None for x in (l_heel, r_heel, l_hip, r_hip)): return None if active_foot == "Left": return vec_angle(l_hip - l_heel, r_hip - r_heel) return vec_angle(r_hip - r_heel, l_hip - l_heel) def elbow_angle(skel, side): # ViTPose 25: l_shoulder=6, l_elbow=8, l_wrist=10 # r_shoulder=7, r_elbow=9, r_wrist=11 if side == 'left': shoulder, elbow, wrist = kpt(skel, 6), kpt(skel, 8), kpt(skel, 10) else: shoulder, elbow, wrist = kpt(skel, 7), kpt(skel, 9), kpt(skel, 11) if any(x is None for x in (shoulder, elbow, wrist)): return None return angle3(shoulder, elbow, wrist) BALL_RADIUS_CM = 11 sorted_frames = sorted(window.keys()) n = len(sorted_frames) zone_entry_ball_pos = np.array(zone_entry_ball_pos, float) if zone_entry_ball_pos is not None else None if zone_entry_ball_pos is None: for fn in sorted_frames: fd = window[fn] _, bpos = find_ball(fd.get("persons_3d", {}), fd.get("balls_3d", {}), target_player_id) if bpos is not None: zone_entry_ball_pos = bpos; break if action_type == "Dribble": results = [] left_knee_angles, right_knee_angles, torso_pitch_angles, head_angles = [], [], [], [] mid_foot_ball_distances, left_right_foot_distances = [], [] for fn in sorted_frames: fd = window[fn] p3d, b3d = fd.get("persons_3d", {}), fd.get("balls_3d", {}) skel = p3d.get(target_player_id, {}) _, ball_pos = find_ball(p3d, b3d, target_player_id) feat = compute_features(p3d, b3d, target_player_id) or {} # ViTPose 25: l_ankle=17, r_ankle=18 l_ankle, r_ankle = kpt(skel, 17), kpt(skel, 18) feet_mid = (l_ankle + r_ankle) / 2.0 if l_ankle is not None and r_ankle is not None else None ball_feet_dist = round(float(np.linalg.norm(ball_pos - feet_mid)), 3) if ball_pos is not None and feet_mid is not None else None results.append({"frame": fn, "ball_feet_distance": ball_feet_dist, "trunk_pitch": feat.get("trunc_pitch_angle"), "trunk_roll": feat.get("trunc_roll_angle"), "head_angle": feat.get("head_angle"), "left_elbow_angle": elbow_angle(skel, 'left'), "right_elbow_angle": elbow_angle(skel, 'right')}) left_knee_angles.append(feat.get("l_knee_angle")) right_knee_angles.append(feat.get("r_knee_angle")) torso_pitch_angles.append(feat.get("trunc_pitch_angle")) head_angles.append(feat.get("head_angle")) lfd, rfd = feat.get("l_foot_ball_distance"), feat.get("r_foot_ball_distance") mid_foot_ball_distances.append(round((lfd+rfd)/2.0, 3) if lfd is not None and rfd is not None else None) left_right_foot_distances.append(feat.get("l_r_foot_distance")) return {"frames": results, "left_knee_angles": left_knee_angles, "right_knee_angles": right_knee_angles, "torso_pitch_angles": torso_pitch_angles, "head_angles": head_angles, "mid_foot_ball_distances": mid_foot_ball_distances, "left_right_foot_distances": left_right_foot_distances} touch_idx = (n - 1) // 2 touch_frame_no = sorted_frames[touch_idx] _touch_fd_tmp = window[sorted_frames[touch_idx]] _, _action_ball_pos = find_ball(_touch_fd_tmp.get("persons_3d", {}), _touch_fd_tmp.get("balls_3d", {}), target_player_id) fixed_ball_travel_vec = (_action_ball_pos - zone_entry_ball_pos) if zone_entry_ball_pos is not None and _action_ball_pos is not None else None first_skel = window[sorted_frames[0]].get("persons_3d", {}).get(target_player_id, {}) contact_skel = window[touch_frame_no].get("persons_3d", {}).get(target_player_id, {}) # ViTPose 25: l_ankle=17, r_ankle=18 fl, fr = kpt(first_skel, 17), kpt(first_skel, 18) cl, cr = kpt(contact_skel, 17), kpt(contact_skel, 18) active_foot = "Left" if all(v is not None for v in (fl, fr, cl, cr)): active_foot = "Left" if (dist(cl, fl) or 0.0) > (dist(cr, fr) or 0.0) else "Right" pre_start_idx = 0 pre_entries = [] left_knee_angles, right_knee_angles, torso_pitch_angles, head_angles = [], [], [], [] mid_foot_ball_distances, left_right_foot_distances = [], [] is_pass_like = action_type in ("Pass", "Shoot") for fi in range(pre_start_idx, touch_idx): fn = sorted_frames[fi] fd = window[fn] p3d, b3d = fd.get("persons_3d", {}), fd.get("balls_3d", {}) feat = compute_features(p3d, b3d, target_player_id) or {} _, ball_pos = find_ball(p3d, b3d, target_player_id) skel = p3d.get(target_player_id, {}) if is_pass_like: pre_entries.append({"frame": fn, "body_to_ball_angle": body_to_ball_angle(skel, ball_pos)}) else: bov = body_vs_ball(skel, fixed_ball_travel_vec) if fixed_ball_travel_vec is not None else None pre_entries.append({"frame": fn, "body_orientation_vs_ball": bov, "head_angle": feat.get("head_angle")}) left_knee_angles.append(feat.get("l_knee_angle")) right_knee_angles.append(feat.get("r_knee_angle")) torso_pitch_angles.append(feat.get("trunc_pitch_angle")) head_angles.append(feat.get("head_angle")) lfd, rfd = feat.get("l_foot_ball_distance"), feat.get("r_foot_ball_distance") mid_foot_ball_distances.append(round((lfd+rfd)/2.0, 3) if lfd is not None and rfd is not None else None) left_right_foot_distances.append(feat.get("l_r_foot_distance")) touch_fd = window[touch_frame_no] touch_p3d, touch_b3d = touch_fd.get("persons_3d", {}), touch_fd.get("balls_3d", {}) touch_skel = touch_p3d.get(target_player_id, {}) _, touch_ball = find_ball(touch_p3d, touch_b3d, target_player_id) touch_feat = compute_features(touch_p3d, touch_b3d, target_player_id) or {} action_body_ball_angle = body_to_ball_angle(touch_skel, touch_ball) left_knee_angles.append(touch_feat.get("l_knee_angle")) right_knee_angles.append(touch_feat.get("r_knee_angle")) torso_pitch_angles.append(touch_feat.get("trunc_pitch_angle")) head_angles.append(touch_feat.get("head_angle")) lfd_a, rfd_a = touch_feat.get("l_foot_ball_distance"), touch_feat.get("r_foot_ball_distance") mid_foot_ball_distances.append(round((lfd_a+rfd_a)/2.0, 3) if lfd_a is not None and rfd_a is not None else None) left_right_foot_distances.append(touch_feat.get("l_r_foot_distance")) if is_pass_like: action_data = { "body_to_ball_angle": action_body_ball_angle, "l_r_foot_distance": touch_feat.get("l_r_foot_distance"), "trunc_pitch_angle": touch_feat.get("trunc_pitch_angle"), "trunc_roll_angle": touch_feat.get("trunc_roll_angle"), "left_foot_orientation_angle": touch_feat.get("left_foot_orientation_angle"), "right_foot_orientation_angle": touch_feat.get("right_foot_orientation_angle"), "difference_in_angles": touch_feat.get("difference_in_angles"), "l_knee_angle": touch_feat.get("l_knee_angle"), "r_knee_angle": touch_feat.get("r_knee_angle"), "head_angle": touch_feat.get("head_angle"), "head_pitch_angle": touch_feat.get("head_pitch_angle"), "head_roll_angle": touch_feat.get("head_roll_angle"), "active_foot": active_foot, } else: bov_vals = [e["body_orientation_vs_ball"] for e in pre_entries if e.get("body_orientation_vs_ball") is not None] action_data = { "head_angle": touch_feat.get("head_angle"), "l_knee_angle": touch_feat.get("l_knee_angle"), "r_knee_angle": touch_feat.get("r_knee_angle"), "trunc_pitch_angle": touch_feat.get("trunc_pitch_angle"), "trunc_roll_angle": touch_feat.get("trunc_roll_angle"), "left_foot_orientation_angle": touch_feat.get("left_foot_orientation_angle"), "right_foot_orientation_angle": touch_feat.get("right_foot_orientation_angle"), "difference_in_angles": touch_feat.get("difference_in_angles"), "l_r_foot_distance": touch_feat.get("l_r_foot_distance"), "avg_body_orientation_vs_ball": round(sum(bov_vals)/len(bov_vals), 3) if bov_vals else None, } if touch_skel and touch_ball is not None and zone_entry_ball_pos is not None: btv = touch_ball - zone_entry_ball_pos action_data["body_orientation_vs_ball"] = body_vs_ball(touch_skel, btv) if touch_skel and touch_ball is not None: # ViTPose 25: l_heel=21, r_heel=24 heel = kpt(touch_skel, 21 if active_foot == "Left" else 24) if heel is not None: action_data["active_foot_height_pct"] = round(float(heel[1] - touch_ball[1]) / BALL_RADIUS_CM * 100.0, 3) backward_weighted_angle = forward_weighted_angle = None if is_pass_like: bwd_angles = [a for fi in range(pre_start_idx, touch_idx+1) if (a := leg_separation_angle(window[sorted_frames[fi]].get("persons_3d", {}).get(target_player_id, {}), active_foot)) is not None] if bwd_angles: backward_weighted_angle = max(bwd_angles) fwd_angles = [a for fi in range(touch_idx+1, n) if (a := leg_separation_angle(window[sorted_frames[fi]].get("persons_3d", {}).get(target_player_id, {}), active_foot)) is not None] if fwd_angles: forward_weighted_angle = max(fwd_angles) post_buffer = [] for fi in range(touch_idx+1, n): fn = sorted_frames[fi] fd = window[fn] p3d, b3d = fd.get("persons_3d", {}), fd.get("balls_3d", {}) feat = compute_features(p3d, b3d, target_player_id) or {} if is_pass_like: _, ball_pos = find_ball(p3d, b3d, target_player_id) skel = p3d.get(target_player_id, {}) post_buffer.append({"frame": fn, "head_angle": feat.get("head_angle"), "body_to_ball_angle": body_to_ball_angle(skel, ball_pos)}) else: ld, rd = feat.get("l_foot_ball_distance") or 0.0, feat.get("r_foot_ball_distance") or 0.0 post_buffer.append({"frame": fn, "mid_feet_ball_dist": round((ld+rd)/2.0, 3)}) left_knee_angles.append(feat.get("l_knee_angle")) right_knee_angles.append(feat.get("r_knee_angle")) torso_pitch_angles.append(feat.get("trunc_pitch_angle")) head_angles.append(feat.get("head_angle")) lfd_p, rfd_p = feat.get("l_foot_ball_distance"), feat.get("r_foot_ball_distance") mid_foot_ball_distances.append(round((lfd_p+rfd_p)/2.0, 3) if lfd_p is not None and rfd_p is not None else None) left_right_foot_distances.append(feat.get("l_r_foot_distance")) report = { "active_foot": active_foot, "touch_frame": touch_frame_no, "pre_action": pre_entries, "action_frame": action_data, "post_action": post_buffer, "left_knee_angles": left_knee_angles, "right_knee_angles": right_knee_angles, "torso_pitch_angles": torso_pitch_angles, "head_angles": head_angles, "mid_foot_ball_distances": mid_foot_ball_distances, "left_right_foot_distances": left_right_foot_distances, } if is_pass_like: report["backward_weighted_angle"] = backward_weighted_angle report["forward_weighted_angle"] = forward_weighted_angle return report def run_pipeline(cameras, utils_paths, sizes, progress_cb=None): if len(cameras) < 2: raise ValueError("Insufficient Cameras For 3D-based Analysis") temp_caps = {} cameras_fps = {} cameras_frame_counts = {} cameras_sizes = {} for camera in cameras.keys(): temp_caps[camera] = cv2.VideoCapture(cameras[camera]) if not temp_caps[camera].isOpened(): for cap in temp_caps.values(): cap.release() raise ValueError(f"Camera {camera} video could not be opened by OpenCV") cameras_fps[camera] = temp_caps[camera].get(cv2.CAP_PROP_FPS) cameras_frame_counts[camera] = int(temp_caps[camera].get(cv2.CAP_PROP_FRAME_COUNT) or 0) cameras_sizes[camera] = ( int(temp_caps[camera].get(cv2.CAP_PROP_FRAME_WIDTH) or 0), int(temp_caps[camera].get(cv2.CAP_PROP_FRAME_HEIGHT) or 0), ) if any(v <= 0 for v in cameras_fps.values()): for cap in temp_caps.values(): cap.release() raise ValueError("One or more uploaded videos has an invalid FPS or could not be decoded") if any(v <= 0 for v in cameras_frame_counts.values()): for cap in temp_caps.values(): cap.release() raise ValueError("One or more uploaded videos has zero readable frames") if len(set(round(v) for v in cameras_fps.values())) != 1: for cap in temp_caps.values(): cap.release() raise ValueError("Not Compatible Frame Rates") fps = int(round(next(iter(cameras_fps.values())))) common_frame_count = min(cameras_frame_counts.values()) for cap in temp_caps.values(): cap.release() pose_model = VitInference(utils_paths['POSE_PATH'], None, model_name=None, dataset='coco_25', device='cpu', is_video=True) connections = ((17, 15), (15, 12), (18, 16), (16, 13), (12, 14), (13, 14), (6, 5), (7, 5), (6, 8), (7, 9), (8, 10), (9, 11), (1, 2), (0, 1), (0, 2), (1, 3), (2, 4), (17, 21), (18, 24), (19, 20), (22, 23), (19, 21), (22, 24), (5, 0), (6, 12), (7, 13), (17, 19), (17, 20), (18, 22), (18, 23), (19, 20), (20, 21), (19, 21), (22, 23), (23, 24), (22, 24)) Pn_dict, K_dict, dist_dict = prepare_cameras(utils_paths['CALIBRATION_PATH']) actions = load_action_intervals(utils_paths['ACTIONS_PATH'], fps) reports = [] for i, action in enumerate(actions): if action["start_frame"] >= common_frame_count: max_time = max(0.0, (common_frame_count - 1) / fps) reports.append({ "error": f"Action starts beyond the synchronized video range. Last common frame across all cameras is {common_frame_count - 1} ({max_time:.2f}s).", "action": action["label"] }) continue if action["end_frame"] >= common_frame_count: max_time = max(0.0, (common_frame_count - 1) / fps) reports.append({ "error": f"Action ends beyond the synchronized video range. Last common frame across all cameras is {common_frame_count - 1} ({max_time:.2f}s).", "action": action["label"] }) continue history = process_action_interval(cameras, sizes, action, utils_paths['YOLO_PATH'], pose_model, Pn_dict, K_dict, dist_dict, connections, progress_cb, i, len(actions)) if not history: reports.append({ "error": "No synchronized frames could be read for this interval. Check that all videos are decodable, aligned, and long enough for the selected timestamps.", "action": action["label"] }) continue middle = (action["start_frame"] + action["end_frame"]) // 2 target_pair = proximity_target(history, middle) if target_pair is None: reports.append({"error": "No player was detected near the ball in the designated window.", "action": action["label"]}) continue analytics = analyze(window=history, target_player_id=target_pair[1], action_type=action["label"]) if analytics is None: reports.append({"error": "Insufficient skeletal keypoints to compute biomechanical metrics.", "action": action["label"]}) continue skel_history = {frame_no: frame_data["persons_3d"].get(target_pair[1]) for frame_no, frame_data in history.items() if target_pair[1] in frame_data.get("persons_3d", {})} ball_history = {frame_no: frame_data["balls_3d"].get(target_pair[0]) for frame_no, frame_data in history.items() if target_pair[0] in frame_data.get("balls_3d", {})} reports.append({ "action": action["label"], "start_frame": action["start_frame"], "end_frame": action["end_frame"], "fps": fps, "target_player": target_pair[1], "skel_history": skel_history, "ball_history": ball_history, "analytics": analytics, }) return reports