Spaces:
Running
Running
| import cv2 | |
| import math | |
| import copy | |
| import json | |
| import numpy as np | |
| import supervision as sv | |
| from ultralytics import YOLO | |
| import plotly.graph_objects as go | |
| from collections import defaultdict, deque | |
| from ViTPose.inference import VitInference | |
| from scipy.optimize import linear_sum_assignment | |
| def prepare_cameras(calibration_path): | |
| data = np.load(calibration_path) | |
| cam_ids = sorted({int(k[2:]) for k in data.files if k.startswith("Pn")}) | |
| Pn_dict, K_dict, dist_dict = {}, {}, {} | |
| for cid in cam_ids: | |
| p_key = f"Pn{cid}" | |
| k_key = f"K{cid}" | |
| d_key = f"dist{cid}" | |
| if p_key in data.files and k_key in data.files and d_key in data.files: | |
| Pn_dict[cid] = data[p_key] | |
| K_dict[cid] = data[k_key] | |
| dist_dict[cid] = data[d_key] | |
| if not Pn_dict: | |
| raise ValueError(f"No valid camera params found in: {calibration_path}") | |
| return Pn_dict, K_dict, dist_dict | |
| def rescale_points(persons, balls, current_size, target_size): | |
| sx = target_size[0] / current_size[0] | |
| sy = target_size[1] / current_size[1] | |
| persons = {pid: {idx: (x * sx, y * sy) for idx, (x, y) in skel.items()} for pid, skel in persons.items()} | |
| balls = {bid: (x1 * sx, y1 * sy, x2 * sx, y2 * sy) for bid, (x1, y1, x2, y2) in balls.items()} | |
| return persons, balls | |
| def detect_entities(frame, result, pose_model, target_size, margin_ratio=0.35): | |
| def iou(a, b): | |
| x1 = max(a[0], b[0]) | |
| y1 = max(a[1], b[1]) | |
| x2 = min(a[2], b[2]) | |
| y2 = min(a[3], b[3]) | |
| inter = max(0, x2 - x1) * max(0, y2 - y1) | |
| area_a = max(0, a[2] - a[0]) * max(0, a[3] - a[1]) | |
| area_b = max(0, b[2] - b[0]) * max(0, b[3] - b[1]) | |
| union = area_a + area_b - inter | |
| return inter / union if union > 0 else 0.0 | |
| def remove_overlaps(boxes, ids, confs, thr=0.65): | |
| keep = [] | |
| for i in np.argsort(-confs): | |
| if all(iou(boxes[i], boxes[j]) <= thr for j in keep): | |
| keep.append(i) | |
| return boxes[keep], [ids[i] for i in keep], confs[keep] | |
| persons = {} | |
| balls = {} | |
| fh, fw = frame.shape[:2] | |
| current_size = (fw, fh) | |
| person_boxes = np.empty((0, 4), dtype=np.float32) | |
| person_ids = [] | |
| person_confs = np.empty((0,), dtype=np.float32) | |
| if result.boxes is not None and result.boxes.id is not None and len(result.boxes) > 0: | |
| boxes = result.boxes.xyxy.cpu().numpy() | |
| ids = result.boxes.id.int().cpu().tolist() | |
| confs = result.boxes.conf.cpu().numpy() | |
| class_ids = result.boxes.cls.int().cpu().tolist() | |
| mask = np.array([(c >= 0.5) if cls == 0 else (c >= 0.25) for c, cls in zip(confs, class_ids)]) | |
| boxes = boxes[mask] | |
| confs = confs[mask] | |
| ids = [track_id for track_id, keep in zip(ids, mask) if keep] | |
| class_ids = [class_id for class_id, keep in zip(class_ids, mask) if keep] | |
| for box, track_id, score, class_id in zip(boxes, ids, confs, class_ids): | |
| x1, y1, x2, y2 = box | |
| track_id = int(track_id) | |
| class_id = int(class_id) | |
| if class_id == 0: | |
| w = x2 - x1 | |
| h = y2 - y1 | |
| x1e = max(0, x1 - margin_ratio * w) | |
| y1e = max(0, y1 - margin_ratio * h) | |
| x2e = min(fw, x2 + margin_ratio * w) | |
| y2e = min(fh, y2 + margin_ratio * h) | |
| person_boxes = np.vstack([person_boxes, np.array([x1e, y1e, x2e, y2e])[None, :]]) | |
| person_ids.append(track_id) | |
| person_confs = np.append(person_confs, score) | |
| elif class_id == 32: | |
| balls[track_id] = tuple(map(int, box)) | |
| if len(person_boxes): | |
| person_boxes, person_ids, person_confs = remove_overlaps(person_boxes, person_ids, person_confs, 0.75) | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| poses = pose_model.inference_from_bboxes(frame_rgb, person_boxes, ids=person_ids, scores=person_confs.tolist()) | |
| for person_id, keypoints in poses.items(): | |
| skel = {} | |
| for i, point in enumerate(keypoints): | |
| y, x, score = point | |
| skel[i] = (float(x), float(y)) | |
| persons[int(person_id)] = skel | |
| persons, balls = rescale_points(persons, balls, current_size, target_size) | |
| return persons, balls | |
| def correspond(persons_2d, balls_2d, Pn_dict, K_dict, dist_dict, threshold_px=75): | |
| MATCH_KPTS = list(range(25)) | |
| INF = 1e9 | |
| def calc_F(Pni, Pnj, Ki, Kj): | |
| R1, t1 = Pni[:, :3], Pni[:, 3].reshape(3, 1) | |
| R2, t2 = Pnj[:, :3], Pnj[:, 3].reshape(3, 1) | |
| R21 = R2 @ R1.T | |
| t21 = t2 - R21 @ t1 | |
| tx = np.array([ | |
| [0, -t21[2, 0], t21[1, 0]], | |
| [t21[2, 0], 0, -t21[0, 0]], | |
| [-t21[1, 0], t21[0, 0], 0 ] | |
| ]) | |
| E = tx @ R21 | |
| F = np.linalg.inv(Kj).T @ E @ np.linalg.inv(Ki) | |
| U, S, Vt = np.linalg.svd(F) | |
| S[2] = 0.0 | |
| F = U @ np.diag(S) @ Vt | |
| return F / (F[2, 2] + 1e-12) | |
| def undistort_pt(pt, K, dist): | |
| pt_arr = np.array([[pt]], dtype=np.float64) | |
| pt_ud = cv2.undistortPoints(pt_arr, K, dist, P=K) | |
| return np.array([pt_ud[0, 0, 0], pt_ud[0, 0, 1], 1.0]) | |
| def sampson_dist(pti, ptj, Ki, Kj, disti, distj, F): | |
| x1 = undistort_pt(pti, Ki, disti) | |
| x2 = undistort_pt(ptj, Kj, distj) | |
| Fx1 = F @ x1 | |
| Ftx2 = F.T @ x2 | |
| num = (x2 @ Fx1) ** 2 | |
| den = Fx1[0]**2 + Fx1[1]**2 + Ftx2[0]**2 + Ftx2[1]**2 + 1e-12 | |
| return float(np.sqrt(abs(num / den))) | |
| def pair_cost_persons(ref_skl, cand_skl, Ki, Kj, di, dj, F, min_kpts=3): | |
| dists = [] | |
| for k in MATCH_KPTS: | |
| if k in ref_skl and k in cand_skl: | |
| dists.append(sampson_dist(ref_skl[k], cand_skl[k], Ki, Kj, di, dj, F)) | |
| if len(dists) < min_kpts: | |
| return INF | |
| dists = np.array(dists, dtype=float) | |
| if len(dists) >= 4: | |
| med = np.median(dists) | |
| mad = np.median(np.abs(dists - med)) | |
| if mad > 1e-6: | |
| dists = dists[np.abs(dists - med) <= 2.5 * mad] | |
| if len(dists) < min_kpts: | |
| return INF | |
| return float(np.median(dists)) | |
| def pair_cost_ball(rc, cc, Ki, Kj, di, dj, F): | |
| return sampson_dist(rc, cc, Ki, Kj, di, dj, F) | |
| def hungarian_match(cost_matrix, row_ids, col_ids): | |
| if cost_matrix.size == 0: | |
| return {} | |
| row_ind, col_ind = linear_sum_assignment(cost_matrix) | |
| matches = {} | |
| for r, c in zip(row_ind, col_ind): | |
| if cost_matrix[r, c] < threshold_px: | |
| matches[row_ids[r]] = col_ids[c] | |
| return matches | |
| def apply_symmetry_gate(matches, cost_matrix, rid_idx, cids): | |
| accepted = {} | |
| for rid, cid in matches.items(): | |
| ri = rid_idx[rid] | |
| ci = cids.index(cid) | |
| forward_cost = cost_matrix[ri] | |
| valid = forward_cost[forward_cost < INF] | |
| if len(valid) >= 2: | |
| sorted_costs = np.sort(valid) | |
| if sorted_costs[1] < 1.3 * sorted_costs[0] and cost_matrix[ri, ci] > threshold_px * 0.4: | |
| continue | |
| accepted[rid] = cid | |
| return accepted | |
| def avg_cost(acc, cnt): | |
| with np.errstate(invalid='ignore', divide='ignore'): | |
| return np.where(cnt > 0, acc / cnt, INF) | |
| def centre(b): | |
| return ((b[0] + b[2]) / 2, (b[1] + b[3]) / 2) | |
| cam_ids = sorted(persons_2d.keys()) | |
| cam0 = cam_ids[0] | |
| next_person_id = max(persons_2d[cam0].keys(), default=-1) + 1 | |
| next_ball_id = max(balls_2d[cam0].keys(), default=-1) + 1 | |
| for idx in range(1, len(cam_ids)): | |
| cam_j = cam_ids[idx] | |
| curr_persons = persons_2d[cam_j] | |
| curr_balls = balls_2d[cam_j] | |
| p_rids = sorted(set(rid for ci in cam_ids[:idx] for rid in persons_2d[ci])) | |
| p_cids = list(curr_persons.keys()) | |
| b_rids = sorted(set(rid for ci in cam_ids[:idx] for rid in balls_2d[ci])) | |
| b_cids = list(curr_balls.keys()) | |
| P_acc = np.zeros((len(p_rids), len(p_cids))) if (p_rids and p_cids) else None | |
| P_cnt = np.zeros_like(P_acc) if P_acc is not None else None | |
| B_acc = np.zeros((len(b_rids), len(b_cids))) if (b_rids and b_cids) else None | |
| B_cnt = np.zeros_like(B_acc) if B_acc is not None else None | |
| p_rid_idx = {rid: i for i, rid in enumerate(p_rids)} | |
| b_rid_idx = {rid: i for i, rid in enumerate(b_rids)} | |
| for cam_i in cam_ids[:idx]: | |
| F = calc_F(Pn_dict[cam_i], Pn_dict[cam_j], K_dict[cam_i], K_dict[cam_j]) | |
| Ki, Kj = K_dict[cam_i], K_dict[cam_j] | |
| di, dj = dist_dict[cam_i], dist_dict[cam_j] | |
| if P_acc is not None: | |
| for rid, rskl in persons_2d[cam_i].items(): | |
| ri = p_rid_idx[rid] | |
| for ci, cid in enumerate(p_cids): | |
| c = pair_cost_persons(rskl, curr_persons[cid], Ki, Kj, di, dj, F) | |
| if c < INF: | |
| P_acc[ri, ci] += c | |
| P_cnt[ri, ci] += 1 | |
| if B_acc is not None: | |
| for rid, rb in balls_2d[cam_i].items(): | |
| ri = b_rid_idx[rid] | |
| rc = centre(rb) | |
| for ci, cid in enumerate(b_cids): | |
| c = pair_cost_ball(rc, centre(curr_balls[cid]), Ki, Kj, di, dj, F) | |
| if c < INF: | |
| B_acc[ri, ci] += c | |
| B_cnt[ri, ci] += 1 | |
| ordered_persons = {} | |
| used_p = set() | |
| if P_acc is not None: | |
| P_cost = avg_cost(P_acc, P_cnt) | |
| for rid, cid in apply_symmetry_gate( | |
| hungarian_match(P_cost, p_rids, p_cids), P_cost, p_rid_idx, p_cids | |
| ).items(): | |
| ordered_persons[rid] = curr_persons[cid] | |
| used_p.add(cid) | |
| for cid in p_cids: | |
| if cid not in used_p: | |
| ordered_persons[next_person_id] = curr_persons[cid] | |
| next_person_id += 1 | |
| ordered_balls = {} | |
| used_b = set() | |
| if B_acc is not None: | |
| B_cost = avg_cost(B_acc, B_cnt) | |
| for rid, cid in apply_symmetry_gate( | |
| hungarian_match(B_cost, b_rids, b_cids), B_cost, b_rid_idx, b_cids | |
| ).items(): | |
| ordered_balls[rid] = curr_balls[cid] | |
| used_b.add(cid) | |
| for cid in b_cids: | |
| if cid not in used_b: | |
| ordered_balls[next_ball_id] = curr_balls[cid] | |
| next_ball_id += 1 | |
| persons_2d[cam_j] = ordered_persons | |
| balls_2d[cam_j] = ordered_balls | |
| return persons_2d, balls_2d | |
| def stereo(persons_2d, balls_2d, Pn_dict, K_dict, dist_dict): | |
| def triangulate(points_2d, projection_matrices): | |
| A = [] | |
| for i in range(len(points_2d)): | |
| u, v = points_2d[i] | |
| P = projection_matrices[i] | |
| A.append(u * P[2, :] - P[0, :]) | |
| A.append(v * P[2, :] - P[1, :]) | |
| A = np.array(A) | |
| _, _, Vh = np.linalg.svd(A) | |
| X_h = Vh[-1, :] | |
| return (X_h[:3] / X_h[3]).flatten() | |
| def undistort_point(pt, K, dist): | |
| pt_arr = np.array(pt, dtype=np.float32).reshape(1, 1, 2) | |
| pt_ud = cv2.undistortPoints(pt_arr, K, dist, P=K) | |
| return pt_ud[0, 0] | |
| persons_3d = {} | |
| all_person_ids = set() | |
| for cam_persons in persons_2d.values(): all_person_ids.update(cam_persons.keys()) | |
| for pid in all_person_ids: | |
| person_cams = {cid: skel for cid, c_pers in persons_2d.items() if pid in c_pers for skel in [c_pers[pid]]} | |
| if len(person_cams) < 2: continue | |
| all_kpt_indices = set() | |
| for skel in person_cams.values(): all_kpt_indices.update(skel.keys()) | |
| person_3d = {} | |
| for kpt_idx in all_kpt_indices: | |
| pts_to_triangulate = [] | |
| pns_to_triangulate = [] | |
| for cid, skel in person_cams.items(): | |
| if kpt_idx in skel: | |
| ud_pt = undistort_point(skel[kpt_idx], K_dict[cid], dist_dict[cid]) | |
| pts_to_triangulate.append(ud_pt) | |
| P = K_dict[cid] @ Pn_dict[cid] | |
| pns_to_triangulate.append(P) | |
| if len(pts_to_triangulate) >= 2: person_3d[kpt_idx] = triangulate(pts_to_triangulate, pns_to_triangulate) | |
| if person_3d: | |
| persons_3d[pid] = person_3d | |
| balls_3d = {} | |
| all_ball_ids = set() | |
| for cam_balls in balls_2d.values(): all_ball_ids.update(cam_balls.keys()) | |
| for bid in all_ball_ids: | |
| pts_to_triangulate = [] | |
| pns_to_triangulate = [] | |
| for cid, c_balls in balls_2d.items(): | |
| if bid in c_balls: | |
| bbox = c_balls[bid] | |
| center = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2) | |
| ud_pt = undistort_point(center, K_dict[cid], dist_dict[cid]) | |
| pts_to_triangulate.append(ud_pt) | |
| P = K_dict[cid] @ Pn_dict[cid] | |
| pns_to_triangulate.append(P) | |
| if len(pts_to_triangulate) >= 2: balls_3d[bid] = triangulate(pts_to_triangulate, pns_to_triangulate) | |
| return persons_3d, balls_3d | |
| def load_action_intervals(json_path, fps): | |
| timecode_fps = max(1, int(round(float(fps)))) | |
| def timestamp_to_frame(timestamp, action_number, field_name): | |
| if not isinstance(timestamp, str): | |
| raise ValueError(f"Invalid time format in action {action_number}. {field_name} must use HH:MM:SS:FF.") | |
| parts = timestamp.strip().split(":") | |
| if len(parts) != 4: | |
| raise ValueError(f'Invalid time format "{timestamp}" in action {action_number}. Use HH:MM:SS:FF.') | |
| try: | |
| h, m, s, f = [int(part) for part in parts] | |
| except ValueError: | |
| raise ValueError(f'Invalid time format "{timestamp}" in action {action_number}. Use integer timecode values.') | |
| if h < 0 or m < 0 or m > 59 or s < 0 or s > 59 or f < 0: | |
| raise ValueError(f'Invalid time format "{timestamp}" in action {action_number}. Minutes and seconds must be between 00 and 59.') | |
| if f >= timecode_fps: | |
| raise ValueError( | |
| f'Invalid time format "{timestamp}" in action {action_number}. ' | |
| f'At {timecode_fps}fps, frame values must be between 00 and {timecode_fps - 1:02d}.' | |
| ) | |
| return int((h * 3600 + m * 60 + s) * timecode_fps + f) | |
| with open(json_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| raw_actions = data.get("actions") if isinstance(data, dict) else None | |
| if not isinstance(raw_actions, list) or not raw_actions: | |
| raise ValueError("No actions array found.") | |
| actions = [] | |
| for index, item in enumerate(raw_actions): | |
| action_number = index + 1 | |
| if not isinstance(item, dict): | |
| raise ValueError(f"Action {action_number} must be an object.") | |
| label = item.get("label") | |
| if not isinstance(label, str) or not label.strip(): | |
| raise ValueError(f"Action {action_number} must include a label.") | |
| start_frame = timestamp_to_frame(item.get("start"), action_number, "start") | |
| end_frame = timestamp_to_frame(item.get("end"), action_number, "end") | |
| if end_frame < start_frame: | |
| raise ValueError(f"Invalid action interval in action {action_number}. End must not be before start.") | |
| actions.append({"label": label.strip(), "start_frame": start_frame, "end_frame": end_frame}) | |
| return actions | |
| def process_action_interval(cameras, sizes, action, yolo_path, pose_model, Pn_dict, K_dict, dist_dict, connections, progress_cb=None, current_act=0, total_act=1): | |
| cameras_caps = {} | |
| cameras_persons = {} | |
| cameras_balls = {} | |
| cameras_models = {} | |
| for camera in cameras.keys(): | |
| cap = cv2.VideoCapture(cameras[camera]) | |
| cameras_caps[camera] = cap | |
| cameras_persons[camera] = None | |
| cameras_balls[camera] = None | |
| cameras_models[camera] = YOLO(yolo_path) | |
| first_cap = next(iter(cameras_caps.values())) | |
| infer_size = (int(first_cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(first_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) | |
| target_size = sizes['TARGET_SIZE'] | |
| yolo_imgsz = sizes['YOLO_IMGSZ'] | |
| start_frame = action["start_frame"] | |
| end_frame = action["end_frame"] | |
| for cap in cameras_caps.values(): | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) | |
| history = {} | |
| total_frames = end_frame - start_frame + 1 | |
| for step, frame_no in enumerate(range(start_frame, end_frame + 1)): | |
| if progress_cb: | |
| # Check if cancellation was requested | |
| should_continue = progress_cb(current_act, total_act, step, max(1, total_frames)) | |
| if should_continue is False: | |
| raise InterruptedError("Processing cancelled by user") | |
| temp_cameras_frames = {} | |
| cameras_frames = {} | |
| cameras_rets = {} | |
| for camera in cameras: | |
| ret, frame = cameras_caps[camera].read() | |
| temp_cameras_frames[camera] = frame | |
| cameras_rets[camera] = ret | |
| not_working_cameras = [camera for camera in cameras_rets if not cameras_rets[camera]] | |
| if (len(cameras) - len(not_working_cameras)) < 2: | |
| break | |
| for camera in cameras: | |
| if camera not in not_working_cameras: | |
| cameras_frames[camera] = temp_cameras_frames[camera] | |
| for camera in cameras_frames: | |
| result = cameras_models[camera].track(cameras_frames[camera], persist=True, verbose=False, imgsz=yolo_imgsz)[0] | |
| persons, balls = detect_entities(cameras_frames[camera], result, pose_model, target_size) | |
| cameras_persons[camera] = persons | |
| cameras_balls[camera] = balls | |
| cameras_persons, cameras_balls = correspond(cameras_persons, cameras_balls, Pn_dict, K_dict, dist_dict) | |
| persons_3d, balls_3d = stereo(cameras_persons, cameras_balls, Pn_dict, K_dict, dist_dict) | |
| history[frame_no] = { | |
| "persons_3d": copy.deepcopy(persons_3d), | |
| "persons_2d": copy.deepcopy(cameras_persons), | |
| "balls_3d": copy.deepcopy(balls_3d), | |
| "balls_2d": copy.deepcopy(cameras_balls), | |
| } | |
| for cap in cameras_caps.values(): | |
| cap.release() | |
| return history | |
| def proximity_target(history, middle): | |
| for i in range(middle-3, middle+4): | |
| frame_data = history.get(i) | |
| if frame_data is None: | |
| continue | |
| if (not frame_data['persons_3d']) or (not frame_data['balls_3d']): | |
| continue | |
| pairs = {} | |
| for ball_id, ball_center in frame_data['balls_3d'].items(): | |
| closest_person = None | |
| closest_distance = float('inf') | |
| for person_id, person_skel in frame_data['persons_3d'].items(): | |
| person_center = (np.array(person_skel[17]) + np.array(person_skel[18])) / 2 | |
| distance = math.dist(person_center, ball_center) | |
| if distance < closest_distance: | |
| closest_distance = distance | |
| closest_person = person_id | |
| if closest_person is not None: | |
| pairs[ball_id] = [closest_person, closest_distance] | |
| global_closest_ball = None | |
| global_closest_person = None | |
| global_closest_distance = float('inf') | |
| for ball_id, closest_details in pairs.items(): | |
| if closest_details[1] < global_closest_distance: | |
| global_closest_person = closest_details[0] | |
| global_closest_distance = closest_details[1] | |
| global_closest_ball = ball_id | |
| if global_closest_distance <= 150: | |
| return (global_closest_ball, global_closest_person, global_closest_distance) | |
| return None | |
| def analyze(window, target_player_id, action_type, zone_entry_ball_pos=None): | |
| def kpt(skel, i): | |
| v = skel.get(i) | |
| return np.array(v, float) if v is not None else None | |
| def dist(p1, p2): | |
| if p1 is None or p2 is None: return None | |
| return float(np.linalg.norm(np.array(p1, float) - np.array(p2, float))) | |
| def vec_angle(v1, v2): | |
| v1, v2 = np.array(v1, float), np.array(v2, float) | |
| n1, n2 = np.linalg.norm(v1), np.linalg.norm(v2) | |
| if n1 < 1e-9 or n2 < 1e-9: return None | |
| return round(float(np.degrees(np.arccos(np.clip(np.dot(v1/n1, v2/n2), -1.0, 1.0)))), 3) | |
| def angle3(a, b, c): | |
| if a is None or b is None or c is None: return None | |
| return vec_angle(np.array(a, float)-np.array(b, float), np.array(c, float)-np.array(b, float)) | |
| def find_ball(p3d, b3d, pid): | |
| if not b3d: return None, None | |
| skel = p3d.get(pid, {}) | |
| # ViTPose 25: l_ankle=17, r_ankle=18 | |
| la, ra = kpt(skel, 17), kpt(skel, 18) | |
| pivot = (la+ra)/2.0 if la is not None and ra is not None else None | |
| best_bid, best_pos, best_d = None, None, float('inf') | |
| for bid, bpos in b3d.items(): | |
| if bpos is None: continue | |
| bpos = np.array(bpos, float) | |
| d = float(np.linalg.norm(pivot - bpos)) if pivot is not None else 0.0 | |
| if d < best_d: best_d, best_bid, best_pos = d, bid, bpos | |
| return best_bid, best_pos | |
| def compute_features(p3d, b3d, pid): | |
| skel = p3d.get(pid) | |
| if skel is None: return None | |
| def g(i): return kpt(skel, i) | |
| # ViTPose 25 keypoint indices: | |
| # 0=nose, 1=l_eye, 2=r_eye, 3=l_ear, 4=r_ear | |
| # 5=neck | |
| # 6=l_shoulder, 7=r_shoulder | |
| # 8=l_elbow, 9=r_elbow | |
| # 10=l_wrist, 11=r_wrist | |
| # 12=l_hip, 13=r_hip | |
| # 15=l_knee, 16=r_knee | |
| # 17=l_ankle, 18=r_ankle | |
| # 19=l_big_toe, 20=l_small_toe, 21=l_heel | |
| # 22=r_big_toe, 23=r_small_toe, 24=r_heel | |
| nose, l_eye, r_eye = g(0), g(1), g(2) | |
| l_shoulder, r_shoulder = g(6), g(7) | |
| l_hip, r_hip = g(12), g(13) | |
| l_knee, r_knee = g(15), g(16) | |
| l_ankle, r_ankle = g(17), g(18) | |
| l_heel, r_heel = g(21), g(24) | |
| l_toe, r_toe = g(19), g(22) | |
| _, ball_center = find_ball(p3d, b3d, pid) | |
| l_knee_angle = angle3(l_hip, l_knee, l_ankle) | |
| r_knee_angle = angle3(r_hip, r_knee, r_ankle) | |
| head_angle = mid_shoulder = mid_hip = None | |
| if all(v is not None for v in (l_shoulder, r_shoulder, l_hip, r_hip)): | |
| mid_shoulder = (l_shoulder + r_shoulder) / 2.0 | |
| mid_hip = (l_hip + r_hip) / 2.0 | |
| if nose is not None: | |
| head_angle = angle3(nose, mid_shoulder, mid_hip) | |
| trunc_pitch_angle = trunc_roll_angle = head_pitch_angle = head_roll_angle = None | |
| if mid_shoulder is not None and mid_hip is not None: | |
| Y = np.array([0.0, 1.0, 0.0]) | |
| Hv = np.array([l_hip[0], mid_hip[1], l_hip[2]]) - mid_hip | |
| Hvn = np.linalg.norm(Hv) or 1.0 | |
| X_ = Hv / Hvn | |
| cross = np.cross(X_, Y) | |
| cn = np.linalg.norm(cross) | |
| if cn < 1e-9: cross, cn = np.array([0.0, 0.0, 1.0]), 1.0 | |
| Z_ = cross / cn | |
| R = np.column_stack((X_, Y, Z_)) | |
| def loc(pt): return R.T @ (pt - mid_hip) | |
| msl = loc(mid_shoulder) | |
| trunc_pitch_angle = angle3((0, msl[1], msl[2]), (0,0,0), (0,-1,0)) | |
| trunc_roll_angle = angle3((msl[0], msl[1], 0), (0,0,0), (0,-1,0)) | |
| if nose is not None and l_eye is not None and r_eye is not None: | |
| meil = loc((l_eye + r_eye) / 2.0) | |
| nloc = loc(nose) | |
| head_pitch_angle = angle3((0, msl[1], msl[2]), (0, meil[1], meil[2]), (0, nloc[1], nloc[2])) | |
| head_roll_angle = angle3((msl[0], msl[1], 0), (meil[0], meil[1], 0), (nloc[0], nloc[1], 0)) | |
| left_foot_orientation_angle = right_foot_orientation_angle = difference_in_angles = None | |
| if all(v is not None for v in (l_hip, r_hip, l_heel, r_heel, l_toe, r_toe)): | |
| body_dir = r_hip - l_hip | |
| body_fwd = np.array([-body_dir[2], 0.0, body_dir[0]]) | |
| bfwd_2d = body_fwd[[0, 2]] | |
| bn = np.linalg.norm(bfwd_2d) | |
| if bn > 1e-6: | |
| bfwd_2d /= bn | |
| for foot_heel, foot_toe, side in ((l_heel, l_toe, "left"), (r_heel, r_toe, "right")): | |
| fv = (foot_toe - foot_heel)[[0, 2]] | |
| fn_ = np.linalg.norm(fv) | |
| if fn_ > 1e-6: | |
| fv /= fn_ | |
| ang = float(np.degrees(np.arccos(np.clip(np.dot(fv, bfwd_2d), -1.0, 1.0)))) | |
| if side == "left": left_foot_orientation_angle = round(ang, 3) | |
| else: right_foot_orientation_angle = round(ang, 3) | |
| if left_foot_orientation_angle is not None and right_foot_orientation_angle is not None: | |
| difference_in_angles = abs(left_foot_orientation_angle - right_foot_orientation_angle) | |
| l_r_foot_distance = None | |
| if all(v is not None for v in (l_ankle, r_ankle, l_knee)): | |
| akl = dist(l_ankle, l_knee) | |
| if akl and akl > 0: | |
| l_r_foot_distance = round(dist(l_ankle, r_ankle), 3) | |
| l_foot_ball_distance = r_foot_ball_distance = None | |
| if ball_center is not None: | |
| if l_ankle is not None: l_foot_ball_distance = round(float(dist(l_ankle, ball_center)), 3) | |
| if r_ankle is not None: r_foot_ball_distance = round(float(dist(r_ankle, ball_center)), 3) | |
| return { | |
| "head_angle": head_angle, "l_knee_angle": l_knee_angle, "r_knee_angle": r_knee_angle, | |
| "trunc_pitch_angle": trunc_pitch_angle, "trunc_roll_angle": trunc_roll_angle, | |
| "head_pitch_angle": head_pitch_angle, "head_roll_angle": head_roll_angle, | |
| "left_foot_orientation_angle": left_foot_orientation_angle, | |
| "right_foot_orientation_angle": right_foot_orientation_angle, | |
| "difference_in_angles": difference_in_angles, | |
| "l_r_foot_distance": l_r_foot_distance, | |
| "l_foot_ball_distance": l_foot_ball_distance, "r_foot_ball_distance": r_foot_ball_distance, | |
| "ball_position": ball_center.tolist() if ball_center is not None else None, | |
| "l_ankle": l_ankle, "r_ankle": r_ankle, | |
| } | |
| def body_to_ball_angle(skel, ball_pos): | |
| if ball_pos is None: return None | |
| # ViTPose 25: l_shoulder=6, r_shoulder=7 | |
| l_shoulder, r_shoulder = kpt(skel, 6), kpt(skel, 7) | |
| if l_shoulder is None or r_shoulder is None: return None | |
| forward = r_shoulder - l_shoulder | |
| forward[1] = 0.0 | |
| # ViTPose 25: l_ankle=17, r_ankle=18 | |
| l_ankle, r_ankle = kpt(skel, 17), kpt(skel, 18) | |
| if l_ankle is None or r_ankle is None: | |
| # fallback to heels: l_heel=21, r_heel=24 | |
| l_ankle, r_ankle = kpt(skel, 21), kpt(skel, 24) | |
| feet_mid = (l_ankle + r_ankle) / 2.0 | |
| to_ball = ball_pos - feet_mid | |
| to_ball[1] = 0.0 | |
| return vec_angle(forward, to_ball) | |
| def body_vs_ball(skel, ball_travel_vec): | |
| # ViTPose 25: l_hip=12, r_hip=13 | |
| lh, rh = kpt(skel, 12), kpt(skel, 13) | |
| if lh is None or rh is None: return None | |
| body_dir = rh - lh | |
| forward = np.array([-body_dir[2], 0.0, body_dir[0]]) | |
| bv = np.array(ball_travel_vec, float).copy() | |
| bv[1] = 0.0 | |
| return vec_angle(forward, bv) | |
| def leg_separation_angle(skel, active_foot): | |
| # ViTPose 25: l_heel=21, r_heel=24, l_hip=12, r_hip=13 | |
| l_heel, r_heel = kpt(skel, 21), kpt(skel, 24) | |
| l_hip, r_hip = kpt(skel, 12), kpt(skel, 13) | |
| if any(x is None for x in (l_heel, r_heel, l_hip, r_hip)): return None | |
| if active_foot == "Left": return vec_angle(l_hip - l_heel, r_hip - r_heel) | |
| return vec_angle(r_hip - r_heel, l_hip - l_heel) | |
| def elbow_angle(skel, side): | |
| # ViTPose 25: l_shoulder=6, l_elbow=8, l_wrist=10 | |
| # r_shoulder=7, r_elbow=9, r_wrist=11 | |
| if side == 'left': | |
| shoulder, elbow, wrist = kpt(skel, 6), kpt(skel, 8), kpt(skel, 10) | |
| else: | |
| shoulder, elbow, wrist = kpt(skel, 7), kpt(skel, 9), kpt(skel, 11) | |
| if any(x is None for x in (shoulder, elbow, wrist)): return None | |
| return angle3(shoulder, elbow, wrist) | |
| BALL_RADIUS_CM = 11 | |
| sorted_frames = sorted(window.keys()) | |
| n = len(sorted_frames) | |
| zone_entry_ball_pos = np.array(zone_entry_ball_pos, float) if zone_entry_ball_pos is not None else None | |
| if zone_entry_ball_pos is None: | |
| for fn in sorted_frames: | |
| fd = window[fn] | |
| _, bpos = find_ball(fd.get("persons_3d", {}), fd.get("balls_3d", {}), target_player_id) | |
| if bpos is not None: zone_entry_ball_pos = bpos; break | |
| if action_type == "Dribble": | |
| results = [] | |
| left_knee_angles, right_knee_angles, torso_pitch_angles, head_angles = [], [], [], [] | |
| mid_foot_ball_distances, left_right_foot_distances = [], [] | |
| for fn in sorted_frames: | |
| fd = window[fn] | |
| p3d, b3d = fd.get("persons_3d", {}), fd.get("balls_3d", {}) | |
| skel = p3d.get(target_player_id, {}) | |
| _, ball_pos = find_ball(p3d, b3d, target_player_id) | |
| feat = compute_features(p3d, b3d, target_player_id) or {} | |
| # ViTPose 25: l_ankle=17, r_ankle=18 | |
| l_ankle, r_ankle = kpt(skel, 17), kpt(skel, 18) | |
| feet_mid = (l_ankle + r_ankle) / 2.0 if l_ankle is not None and r_ankle is not None else None | |
| ball_feet_dist = round(float(np.linalg.norm(ball_pos - feet_mid)), 3) if ball_pos is not None and feet_mid is not None else None | |
| results.append({"frame": fn, "ball_feet_distance": ball_feet_dist, | |
| "trunk_pitch": feat.get("trunc_pitch_angle"), "trunk_roll": feat.get("trunc_roll_angle"), | |
| "head_angle": feat.get("head_angle"), | |
| "left_elbow_angle": elbow_angle(skel, 'left'), "right_elbow_angle": elbow_angle(skel, 'right')}) | |
| left_knee_angles.append(feat.get("l_knee_angle")) | |
| right_knee_angles.append(feat.get("r_knee_angle")) | |
| torso_pitch_angles.append(feat.get("trunc_pitch_angle")) | |
| head_angles.append(feat.get("head_angle")) | |
| lfd, rfd = feat.get("l_foot_ball_distance"), feat.get("r_foot_ball_distance") | |
| mid_foot_ball_distances.append(round((lfd+rfd)/2.0, 3) if lfd is not None and rfd is not None else None) | |
| left_right_foot_distances.append(feat.get("l_r_foot_distance")) | |
| return {"frames": results, "left_knee_angles": left_knee_angles, | |
| "right_knee_angles": right_knee_angles, "torso_pitch_angles": torso_pitch_angles, | |
| "head_angles": head_angles, "mid_foot_ball_distances": mid_foot_ball_distances, | |
| "left_right_foot_distances": left_right_foot_distances} | |
| touch_idx = (n - 1) // 2 | |
| touch_frame_no = sorted_frames[touch_idx] | |
| _touch_fd_tmp = window[sorted_frames[touch_idx]] | |
| _, _action_ball_pos = find_ball(_touch_fd_tmp.get("persons_3d", {}), _touch_fd_tmp.get("balls_3d", {}), target_player_id) | |
| fixed_ball_travel_vec = (_action_ball_pos - zone_entry_ball_pos) if zone_entry_ball_pos is not None and _action_ball_pos is not None else None | |
| first_skel = window[sorted_frames[0]].get("persons_3d", {}).get(target_player_id, {}) | |
| contact_skel = window[touch_frame_no].get("persons_3d", {}).get(target_player_id, {}) | |
| # ViTPose 25: l_ankle=17, r_ankle=18 | |
| fl, fr = kpt(first_skel, 17), kpt(first_skel, 18) | |
| cl, cr = kpt(contact_skel, 17), kpt(contact_skel, 18) | |
| active_foot = "Left" | |
| if all(v is not None for v in (fl, fr, cl, cr)): | |
| active_foot = "Left" if (dist(cl, fl) or 0.0) > (dist(cr, fr) or 0.0) else "Right" | |
| pre_start_idx = 0 | |
| pre_entries = [] | |
| left_knee_angles, right_knee_angles, torso_pitch_angles, head_angles = [], [], [], [] | |
| mid_foot_ball_distances, left_right_foot_distances = [], [] | |
| is_pass_like = action_type in ("Pass", "Shoot") | |
| for fi in range(pre_start_idx, touch_idx): | |
| fn = sorted_frames[fi] | |
| fd = window[fn] | |
| p3d, b3d = fd.get("persons_3d", {}), fd.get("balls_3d", {}) | |
| feat = compute_features(p3d, b3d, target_player_id) or {} | |
| _, ball_pos = find_ball(p3d, b3d, target_player_id) | |
| skel = p3d.get(target_player_id, {}) | |
| if is_pass_like: | |
| pre_entries.append({"frame": fn, "body_to_ball_angle": body_to_ball_angle(skel, ball_pos)}) | |
| else: | |
| bov = body_vs_ball(skel, fixed_ball_travel_vec) if fixed_ball_travel_vec is not None else None | |
| pre_entries.append({"frame": fn, "body_orientation_vs_ball": bov, "head_angle": feat.get("head_angle")}) | |
| left_knee_angles.append(feat.get("l_knee_angle")) | |
| right_knee_angles.append(feat.get("r_knee_angle")) | |
| torso_pitch_angles.append(feat.get("trunc_pitch_angle")) | |
| head_angles.append(feat.get("head_angle")) | |
| lfd, rfd = feat.get("l_foot_ball_distance"), feat.get("r_foot_ball_distance") | |
| mid_foot_ball_distances.append(round((lfd+rfd)/2.0, 3) if lfd is not None and rfd is not None else None) | |
| left_right_foot_distances.append(feat.get("l_r_foot_distance")) | |
| touch_fd = window[touch_frame_no] | |
| touch_p3d, touch_b3d = touch_fd.get("persons_3d", {}), touch_fd.get("balls_3d", {}) | |
| touch_skel = touch_p3d.get(target_player_id, {}) | |
| _, touch_ball = find_ball(touch_p3d, touch_b3d, target_player_id) | |
| touch_feat = compute_features(touch_p3d, touch_b3d, target_player_id) or {} | |
| action_body_ball_angle = body_to_ball_angle(touch_skel, touch_ball) | |
| left_knee_angles.append(touch_feat.get("l_knee_angle")) | |
| right_knee_angles.append(touch_feat.get("r_knee_angle")) | |
| torso_pitch_angles.append(touch_feat.get("trunc_pitch_angle")) | |
| head_angles.append(touch_feat.get("head_angle")) | |
| lfd_a, rfd_a = touch_feat.get("l_foot_ball_distance"), touch_feat.get("r_foot_ball_distance") | |
| mid_foot_ball_distances.append(round((lfd_a+rfd_a)/2.0, 3) if lfd_a is not None and rfd_a is not None else None) | |
| left_right_foot_distances.append(touch_feat.get("l_r_foot_distance")) | |
| if is_pass_like: | |
| action_data = { | |
| "body_to_ball_angle": action_body_ball_angle, | |
| "l_r_foot_distance": touch_feat.get("l_r_foot_distance"), | |
| "trunc_pitch_angle": touch_feat.get("trunc_pitch_angle"), | |
| "trunc_roll_angle": touch_feat.get("trunc_roll_angle"), | |
| "left_foot_orientation_angle": touch_feat.get("left_foot_orientation_angle"), | |
| "right_foot_orientation_angle": touch_feat.get("right_foot_orientation_angle"), | |
| "difference_in_angles": touch_feat.get("difference_in_angles"), | |
| "l_knee_angle": touch_feat.get("l_knee_angle"), | |
| "r_knee_angle": touch_feat.get("r_knee_angle"), | |
| "head_angle": touch_feat.get("head_angle"), | |
| "head_pitch_angle": touch_feat.get("head_pitch_angle"), | |
| "head_roll_angle": touch_feat.get("head_roll_angle"), | |
| "active_foot": active_foot, | |
| } | |
| else: | |
| bov_vals = [e["body_orientation_vs_ball"] for e in pre_entries if e.get("body_orientation_vs_ball") is not None] | |
| action_data = { | |
| "head_angle": touch_feat.get("head_angle"), | |
| "l_knee_angle": touch_feat.get("l_knee_angle"), | |
| "r_knee_angle": touch_feat.get("r_knee_angle"), | |
| "trunc_pitch_angle": touch_feat.get("trunc_pitch_angle"), | |
| "trunc_roll_angle": touch_feat.get("trunc_roll_angle"), | |
| "left_foot_orientation_angle": touch_feat.get("left_foot_orientation_angle"), | |
| "right_foot_orientation_angle": touch_feat.get("right_foot_orientation_angle"), | |
| "difference_in_angles": touch_feat.get("difference_in_angles"), | |
| "l_r_foot_distance": touch_feat.get("l_r_foot_distance"), | |
| "avg_body_orientation_vs_ball": round(sum(bov_vals)/len(bov_vals), 3) if bov_vals else None, | |
| } | |
| if touch_skel and touch_ball is not None and zone_entry_ball_pos is not None: | |
| btv = touch_ball - zone_entry_ball_pos | |
| action_data["body_orientation_vs_ball"] = body_vs_ball(touch_skel, btv) | |
| if touch_skel and touch_ball is not None: | |
| # ViTPose 25: l_heel=21, r_heel=24 | |
| heel = kpt(touch_skel, 21 if active_foot == "Left" else 24) | |
| if heel is not None: | |
| action_data["active_foot_height_pct"] = round(float(heel[1] - touch_ball[1]) / BALL_RADIUS_CM * 100.0, 3) | |
| backward_weighted_angle = forward_weighted_angle = None | |
| if is_pass_like: | |
| bwd_angles = [a for fi in range(pre_start_idx, touch_idx+1) | |
| if (a := leg_separation_angle(window[sorted_frames[fi]].get("persons_3d", {}).get(target_player_id, {}), active_foot)) is not None] | |
| if bwd_angles: backward_weighted_angle = max(bwd_angles) | |
| fwd_angles = [a for fi in range(touch_idx+1, n) | |
| if (a := leg_separation_angle(window[sorted_frames[fi]].get("persons_3d", {}).get(target_player_id, {}), active_foot)) is not None] | |
| if fwd_angles: forward_weighted_angle = max(fwd_angles) | |
| post_buffer = [] | |
| for fi in range(touch_idx+1, n): | |
| fn = sorted_frames[fi] | |
| fd = window[fn] | |
| p3d, b3d = fd.get("persons_3d", {}), fd.get("balls_3d", {}) | |
| feat = compute_features(p3d, b3d, target_player_id) or {} | |
| if is_pass_like: | |
| _, ball_pos = find_ball(p3d, b3d, target_player_id) | |
| skel = p3d.get(target_player_id, {}) | |
| post_buffer.append({"frame": fn, "head_angle": feat.get("head_angle"), "body_to_ball_angle": body_to_ball_angle(skel, ball_pos)}) | |
| else: | |
| ld, rd = feat.get("l_foot_ball_distance") or 0.0, feat.get("r_foot_ball_distance") or 0.0 | |
| post_buffer.append({"frame": fn, "mid_feet_ball_dist": round((ld+rd)/2.0, 3)}) | |
| left_knee_angles.append(feat.get("l_knee_angle")) | |
| right_knee_angles.append(feat.get("r_knee_angle")) | |
| torso_pitch_angles.append(feat.get("trunc_pitch_angle")) | |
| head_angles.append(feat.get("head_angle")) | |
| lfd_p, rfd_p = feat.get("l_foot_ball_distance"), feat.get("r_foot_ball_distance") | |
| mid_foot_ball_distances.append(round((lfd_p+rfd_p)/2.0, 3) if lfd_p is not None and rfd_p is not None else None) | |
| left_right_foot_distances.append(feat.get("l_r_foot_distance")) | |
| report = { | |
| "active_foot": active_foot, "touch_frame": touch_frame_no, | |
| "pre_action": pre_entries, "action_frame": action_data, "post_action": post_buffer, | |
| "left_knee_angles": left_knee_angles, "right_knee_angles": right_knee_angles, | |
| "torso_pitch_angles": torso_pitch_angles, "head_angles": head_angles, | |
| "mid_foot_ball_distances": mid_foot_ball_distances, "left_right_foot_distances": left_right_foot_distances, | |
| } | |
| if is_pass_like: | |
| report["backward_weighted_angle"] = backward_weighted_angle | |
| report["forward_weighted_angle"] = forward_weighted_angle | |
| return report | |
| def run_pipeline(cameras, utils_paths, sizes, progress_cb=None): | |
| if len(cameras) < 2: | |
| raise ValueError("Insufficient Cameras For 3D-based Analysis") | |
| temp_caps = {} | |
| cameras_fps = {} | |
| cameras_frame_counts = {} | |
| cameras_sizes = {} | |
| for camera in cameras.keys(): | |
| temp_caps[camera] = cv2.VideoCapture(cameras[camera]) | |
| if not temp_caps[camera].isOpened(): | |
| for cap in temp_caps.values(): | |
| cap.release() | |
| raise ValueError(f"Camera {camera} video could not be opened by OpenCV") | |
| cameras_fps[camera] = temp_caps[camera].get(cv2.CAP_PROP_FPS) | |
| cameras_frame_counts[camera] = int(temp_caps[camera].get(cv2.CAP_PROP_FRAME_COUNT) or 0) | |
| cameras_sizes[camera] = ( | |
| int(temp_caps[camera].get(cv2.CAP_PROP_FRAME_WIDTH) or 0), | |
| int(temp_caps[camera].get(cv2.CAP_PROP_FRAME_HEIGHT) or 0), | |
| ) | |
| if any(v <= 0 for v in cameras_fps.values()): | |
| for cap in temp_caps.values(): | |
| cap.release() | |
| raise ValueError("One or more uploaded videos has an invalid FPS or could not be decoded") | |
| if any(v <= 0 for v in cameras_frame_counts.values()): | |
| for cap in temp_caps.values(): | |
| cap.release() | |
| raise ValueError("One or more uploaded videos has zero readable frames") | |
| if len(set(round(v) for v in cameras_fps.values())) != 1: | |
| for cap in temp_caps.values(): | |
| cap.release() | |
| raise ValueError("Not Compatible Frame Rates") | |
| fps = int(round(next(iter(cameras_fps.values())))) | |
| common_frame_count = min(cameras_frame_counts.values()) | |
| for cap in temp_caps.values(): | |
| cap.release() | |
| pose_model = VitInference(utils_paths['POSE_PATH'], None, model_name=None, dataset='coco_25', device='cpu', is_video=True) | |
| connections = ((17, 15), (15, 12), (18, 16), (16, 13), (12, 14), (13, 14), (6, 5), (7, 5), (6, 8), (7, 9), (8, 10), (9, 11), | |
| (1, 2), (0, 1), (0, 2), (1, 3), (2, 4), (17, 21), (18, 24), (19, 20), (22, 23), (19, 21), (22, 24), (5, 0), (6, 12), | |
| (7, 13), (17, 19), (17, 20), (18, 22), (18, 23), (19, 20), (20, 21), (19, 21), (22, 23), (23, 24), (22, 24)) | |
| Pn_dict, K_dict, dist_dict = prepare_cameras(utils_paths['CALIBRATION_PATH']) | |
| actions = load_action_intervals(utils_paths['ACTIONS_PATH'], fps) | |
| reports = [] | |
| for i, action in enumerate(actions): | |
| if action["start_frame"] >= common_frame_count: | |
| max_time = max(0.0, (common_frame_count - 1) / fps) | |
| reports.append({ | |
| "error": f"Action starts beyond the synchronized video range. Last common frame across all cameras is {common_frame_count - 1} ({max_time:.2f}s).", | |
| "action": action["label"] | |
| }) | |
| continue | |
| if action["end_frame"] >= common_frame_count: | |
| max_time = max(0.0, (common_frame_count - 1) / fps) | |
| reports.append({ | |
| "error": f"Action ends beyond the synchronized video range. Last common frame across all cameras is {common_frame_count - 1} ({max_time:.2f}s).", | |
| "action": action["label"] | |
| }) | |
| continue | |
| history = process_action_interval(cameras, sizes, action, utils_paths['YOLO_PATH'], pose_model, Pn_dict, K_dict, dist_dict, connections, progress_cb, i, len(actions)) | |
| if not history: | |
| reports.append({ | |
| "error": "No synchronized frames could be read for this interval. Check that all videos are decodable, aligned, and long enough for the selected timestamps.", | |
| "action": action["label"] | |
| }) | |
| continue | |
| middle = (action["start_frame"] + action["end_frame"]) // 2 | |
| target_pair = proximity_target(history, middle) | |
| if target_pair is None: | |
| reports.append({"error": "No player was detected near the ball in the designated window.", "action": action["label"]}) | |
| continue | |
| analytics = analyze(window=history, target_player_id=target_pair[1], action_type=action["label"]) | |
| if analytics is None: | |
| reports.append({"error": "Insufficient skeletal keypoints to compute biomechanical metrics.", "action": action["label"]}) | |
| continue | |
| skel_history = {frame_no: frame_data["persons_3d"].get(target_pair[1]) for frame_no, frame_data in history.items() if target_pair[1] in frame_data.get("persons_3d", {})} | |
| ball_history = {frame_no: frame_data["balls_3d"].get(target_pair[0]) for frame_no, frame_data in history.items() if target_pair[0] in frame_data.get("balls_3d", {})} | |
| reports.append({ | |
| "action": action["label"], | |
| "start_frame": action["start_frame"], | |
| "end_frame": action["end_frame"], | |
| "fps": fps, | |
| "target_player": target_pair[1], | |
| "skel_history": skel_history, | |
| "ball_history": ball_history, | |
| "analytics": analytics, | |
| }) | |
| return reports | |