SporalizeLabs-api / pipeline.py
Shoraky's picture
Use Shoot action label
c1170b3
import cv2
import math
import copy
import json
import numpy as np
import supervision as sv
from ultralytics import YOLO
import plotly.graph_objects as go
from collections import defaultdict, deque
from ViTPose.inference import VitInference
from scipy.optimize import linear_sum_assignment
def prepare_cameras(calibration_path):
data = np.load(calibration_path)
cam_ids = sorted({int(k[2:]) for k in data.files if k.startswith("Pn")})
Pn_dict, K_dict, dist_dict = {}, {}, {}
for cid in cam_ids:
p_key = f"Pn{cid}"
k_key = f"K{cid}"
d_key = f"dist{cid}"
if p_key in data.files and k_key in data.files and d_key in data.files:
Pn_dict[cid] = data[p_key]
K_dict[cid] = data[k_key]
dist_dict[cid] = data[d_key]
if not Pn_dict:
raise ValueError(f"No valid camera params found in: {calibration_path}")
return Pn_dict, K_dict, dist_dict
def rescale_points(persons, balls, current_size, target_size):
sx = target_size[0] / current_size[0]
sy = target_size[1] / current_size[1]
persons = {pid: {idx: (x * sx, y * sy) for idx, (x, y) in skel.items()} for pid, skel in persons.items()}
balls = {bid: (x1 * sx, y1 * sy, x2 * sx, y2 * sy) for bid, (x1, y1, x2, y2) in balls.items()}
return persons, balls
def detect_entities(frame, result, pose_model, target_size, margin_ratio=0.35):
def iou(a, b):
x1 = max(a[0], b[0])
y1 = max(a[1], b[1])
x2 = min(a[2], b[2])
y2 = min(a[3], b[3])
inter = max(0, x2 - x1) * max(0, y2 - y1)
area_a = max(0, a[2] - a[0]) * max(0, a[3] - a[1])
area_b = max(0, b[2] - b[0]) * max(0, b[3] - b[1])
union = area_a + area_b - inter
return inter / union if union > 0 else 0.0
def remove_overlaps(boxes, ids, confs, thr=0.65):
keep = []
for i in np.argsort(-confs):
if all(iou(boxes[i], boxes[j]) <= thr for j in keep):
keep.append(i)
return boxes[keep], [ids[i] for i in keep], confs[keep]
persons = {}
balls = {}
fh, fw = frame.shape[:2]
current_size = (fw, fh)
person_boxes = np.empty((0, 4), dtype=np.float32)
person_ids = []
person_confs = np.empty((0,), dtype=np.float32)
if result.boxes is not None and result.boxes.id is not None and len(result.boxes) > 0:
boxes = result.boxes.xyxy.cpu().numpy()
ids = result.boxes.id.int().cpu().tolist()
confs = result.boxes.conf.cpu().numpy()
class_ids = result.boxes.cls.int().cpu().tolist()
mask = np.array([(c >= 0.5) if cls == 0 else (c >= 0.25) for c, cls in zip(confs, class_ids)])
boxes = boxes[mask]
confs = confs[mask]
ids = [track_id for track_id, keep in zip(ids, mask) if keep]
class_ids = [class_id for class_id, keep in zip(class_ids, mask) if keep]
for box, track_id, score, class_id in zip(boxes, ids, confs, class_ids):
x1, y1, x2, y2 = box
track_id = int(track_id)
class_id = int(class_id)
if class_id == 0:
w = x2 - x1
h = y2 - y1
x1e = max(0, x1 - margin_ratio * w)
y1e = max(0, y1 - margin_ratio * h)
x2e = min(fw, x2 + margin_ratio * w)
y2e = min(fh, y2 + margin_ratio * h)
person_boxes = np.vstack([person_boxes, np.array([x1e, y1e, x2e, y2e])[None, :]])
person_ids.append(track_id)
person_confs = np.append(person_confs, score)
elif class_id == 32:
balls[track_id] = tuple(map(int, box))
if len(person_boxes):
person_boxes, person_ids, person_confs = remove_overlaps(person_boxes, person_ids, person_confs, 0.75)
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
poses = pose_model.inference_from_bboxes(frame_rgb, person_boxes, ids=person_ids, scores=person_confs.tolist())
for person_id, keypoints in poses.items():
skel = {}
for i, point in enumerate(keypoints):
y, x, score = point
skel[i] = (float(x), float(y))
persons[int(person_id)] = skel
persons, balls = rescale_points(persons, balls, current_size, target_size)
return persons, balls
def correspond(persons_2d, balls_2d, Pn_dict, K_dict, dist_dict, threshold_px=75):
MATCH_KPTS = list(range(25))
INF = 1e9
def calc_F(Pni, Pnj, Ki, Kj):
R1, t1 = Pni[:, :3], Pni[:, 3].reshape(3, 1)
R2, t2 = Pnj[:, :3], Pnj[:, 3].reshape(3, 1)
R21 = R2 @ R1.T
t21 = t2 - R21 @ t1
tx = np.array([
[0, -t21[2, 0], t21[1, 0]],
[t21[2, 0], 0, -t21[0, 0]],
[-t21[1, 0], t21[0, 0], 0 ]
])
E = tx @ R21
F = np.linalg.inv(Kj).T @ E @ np.linalg.inv(Ki)
U, S, Vt = np.linalg.svd(F)
S[2] = 0.0
F = U @ np.diag(S) @ Vt
return F / (F[2, 2] + 1e-12)
def undistort_pt(pt, K, dist):
pt_arr = np.array([[pt]], dtype=np.float64)
pt_ud = cv2.undistortPoints(pt_arr, K, dist, P=K)
return np.array([pt_ud[0, 0, 0], pt_ud[0, 0, 1], 1.0])
def sampson_dist(pti, ptj, Ki, Kj, disti, distj, F):
x1 = undistort_pt(pti, Ki, disti)
x2 = undistort_pt(ptj, Kj, distj)
Fx1 = F @ x1
Ftx2 = F.T @ x2
num = (x2 @ Fx1) ** 2
den = Fx1[0]**2 + Fx1[1]**2 + Ftx2[0]**2 + Ftx2[1]**2 + 1e-12
return float(np.sqrt(abs(num / den)))
def pair_cost_persons(ref_skl, cand_skl, Ki, Kj, di, dj, F, min_kpts=3):
dists = []
for k in MATCH_KPTS:
if k in ref_skl and k in cand_skl:
dists.append(sampson_dist(ref_skl[k], cand_skl[k], Ki, Kj, di, dj, F))
if len(dists) < min_kpts:
return INF
dists = np.array(dists, dtype=float)
if len(dists) >= 4:
med = np.median(dists)
mad = np.median(np.abs(dists - med))
if mad > 1e-6:
dists = dists[np.abs(dists - med) <= 2.5 * mad]
if len(dists) < min_kpts:
return INF
return float(np.median(dists))
def pair_cost_ball(rc, cc, Ki, Kj, di, dj, F):
return sampson_dist(rc, cc, Ki, Kj, di, dj, F)
def hungarian_match(cost_matrix, row_ids, col_ids):
if cost_matrix.size == 0:
return {}
row_ind, col_ind = linear_sum_assignment(cost_matrix)
matches = {}
for r, c in zip(row_ind, col_ind):
if cost_matrix[r, c] < threshold_px:
matches[row_ids[r]] = col_ids[c]
return matches
def apply_symmetry_gate(matches, cost_matrix, rid_idx, cids):
accepted = {}
for rid, cid in matches.items():
ri = rid_idx[rid]
ci = cids.index(cid)
forward_cost = cost_matrix[ri]
valid = forward_cost[forward_cost < INF]
if len(valid) >= 2:
sorted_costs = np.sort(valid)
if sorted_costs[1] < 1.3 * sorted_costs[0] and cost_matrix[ri, ci] > threshold_px * 0.4:
continue
accepted[rid] = cid
return accepted
def avg_cost(acc, cnt):
with np.errstate(invalid='ignore', divide='ignore'):
return np.where(cnt > 0, acc / cnt, INF)
def centre(b):
return ((b[0] + b[2]) / 2, (b[1] + b[3]) / 2)
cam_ids = sorted(persons_2d.keys())
cam0 = cam_ids[0]
next_person_id = max(persons_2d[cam0].keys(), default=-1) + 1
next_ball_id = max(balls_2d[cam0].keys(), default=-1) + 1
for idx in range(1, len(cam_ids)):
cam_j = cam_ids[idx]
curr_persons = persons_2d[cam_j]
curr_balls = balls_2d[cam_j]
p_rids = sorted(set(rid for ci in cam_ids[:idx] for rid in persons_2d[ci]))
p_cids = list(curr_persons.keys())
b_rids = sorted(set(rid for ci in cam_ids[:idx] for rid in balls_2d[ci]))
b_cids = list(curr_balls.keys())
P_acc = np.zeros((len(p_rids), len(p_cids))) if (p_rids and p_cids) else None
P_cnt = np.zeros_like(P_acc) if P_acc is not None else None
B_acc = np.zeros((len(b_rids), len(b_cids))) if (b_rids and b_cids) else None
B_cnt = np.zeros_like(B_acc) if B_acc is not None else None
p_rid_idx = {rid: i for i, rid in enumerate(p_rids)}
b_rid_idx = {rid: i for i, rid in enumerate(b_rids)}
for cam_i in cam_ids[:idx]:
F = calc_F(Pn_dict[cam_i], Pn_dict[cam_j], K_dict[cam_i], K_dict[cam_j])
Ki, Kj = K_dict[cam_i], K_dict[cam_j]
di, dj = dist_dict[cam_i], dist_dict[cam_j]
if P_acc is not None:
for rid, rskl in persons_2d[cam_i].items():
ri = p_rid_idx[rid]
for ci, cid in enumerate(p_cids):
c = pair_cost_persons(rskl, curr_persons[cid], Ki, Kj, di, dj, F)
if c < INF:
P_acc[ri, ci] += c
P_cnt[ri, ci] += 1
if B_acc is not None:
for rid, rb in balls_2d[cam_i].items():
ri = b_rid_idx[rid]
rc = centre(rb)
for ci, cid in enumerate(b_cids):
c = pair_cost_ball(rc, centre(curr_balls[cid]), Ki, Kj, di, dj, F)
if c < INF:
B_acc[ri, ci] += c
B_cnt[ri, ci] += 1
ordered_persons = {}
used_p = set()
if P_acc is not None:
P_cost = avg_cost(P_acc, P_cnt)
for rid, cid in apply_symmetry_gate(
hungarian_match(P_cost, p_rids, p_cids), P_cost, p_rid_idx, p_cids
).items():
ordered_persons[rid] = curr_persons[cid]
used_p.add(cid)
for cid in p_cids:
if cid not in used_p:
ordered_persons[next_person_id] = curr_persons[cid]
next_person_id += 1
ordered_balls = {}
used_b = set()
if B_acc is not None:
B_cost = avg_cost(B_acc, B_cnt)
for rid, cid in apply_symmetry_gate(
hungarian_match(B_cost, b_rids, b_cids), B_cost, b_rid_idx, b_cids
).items():
ordered_balls[rid] = curr_balls[cid]
used_b.add(cid)
for cid in b_cids:
if cid not in used_b:
ordered_balls[next_ball_id] = curr_balls[cid]
next_ball_id += 1
persons_2d[cam_j] = ordered_persons
balls_2d[cam_j] = ordered_balls
return persons_2d, balls_2d
def stereo(persons_2d, balls_2d, Pn_dict, K_dict, dist_dict):
def triangulate(points_2d, projection_matrices):
A = []
for i in range(len(points_2d)):
u, v = points_2d[i]
P = projection_matrices[i]
A.append(u * P[2, :] - P[0, :])
A.append(v * P[2, :] - P[1, :])
A = np.array(A)
_, _, Vh = np.linalg.svd(A)
X_h = Vh[-1, :]
return (X_h[:3] / X_h[3]).flatten()
def undistort_point(pt, K, dist):
pt_arr = np.array(pt, dtype=np.float32).reshape(1, 1, 2)
pt_ud = cv2.undistortPoints(pt_arr, K, dist, P=K)
return pt_ud[0, 0]
persons_3d = {}
all_person_ids = set()
for cam_persons in persons_2d.values(): all_person_ids.update(cam_persons.keys())
for pid in all_person_ids:
person_cams = {cid: skel for cid, c_pers in persons_2d.items() if pid in c_pers for skel in [c_pers[pid]]}
if len(person_cams) < 2: continue
all_kpt_indices = set()
for skel in person_cams.values(): all_kpt_indices.update(skel.keys())
person_3d = {}
for kpt_idx in all_kpt_indices:
pts_to_triangulate = []
pns_to_triangulate = []
for cid, skel in person_cams.items():
if kpt_idx in skel:
ud_pt = undistort_point(skel[kpt_idx], K_dict[cid], dist_dict[cid])
pts_to_triangulate.append(ud_pt)
P = K_dict[cid] @ Pn_dict[cid]
pns_to_triangulate.append(P)
if len(pts_to_triangulate) >= 2: person_3d[kpt_idx] = triangulate(pts_to_triangulate, pns_to_triangulate)
if person_3d:
persons_3d[pid] = person_3d
balls_3d = {}
all_ball_ids = set()
for cam_balls in balls_2d.values(): all_ball_ids.update(cam_balls.keys())
for bid in all_ball_ids:
pts_to_triangulate = []
pns_to_triangulate = []
for cid, c_balls in balls_2d.items():
if bid in c_balls:
bbox = c_balls[bid]
center = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2)
ud_pt = undistort_point(center, K_dict[cid], dist_dict[cid])
pts_to_triangulate.append(ud_pt)
P = K_dict[cid] @ Pn_dict[cid]
pns_to_triangulate.append(P)
if len(pts_to_triangulate) >= 2: balls_3d[bid] = triangulate(pts_to_triangulate, pns_to_triangulate)
return persons_3d, balls_3d
def load_action_intervals(json_path, fps):
timecode_fps = max(1, int(round(float(fps))))
def timestamp_to_frame(timestamp, action_number, field_name):
if not isinstance(timestamp, str):
raise ValueError(f"Invalid time format in action {action_number}. {field_name} must use HH:MM:SS:FF.")
parts = timestamp.strip().split(":")
if len(parts) != 4:
raise ValueError(f'Invalid time format "{timestamp}" in action {action_number}. Use HH:MM:SS:FF.')
try:
h, m, s, f = [int(part) for part in parts]
except ValueError:
raise ValueError(f'Invalid time format "{timestamp}" in action {action_number}. Use integer timecode values.')
if h < 0 or m < 0 or m > 59 or s < 0 or s > 59 or f < 0:
raise ValueError(f'Invalid time format "{timestamp}" in action {action_number}. Minutes and seconds must be between 00 and 59.')
if f >= timecode_fps:
raise ValueError(
f'Invalid time format "{timestamp}" in action {action_number}. '
f'At {timecode_fps}fps, frame values must be between 00 and {timecode_fps - 1:02d}.'
)
return int((h * 3600 + m * 60 + s) * timecode_fps + f)
with open(json_path, "r", encoding="utf-8") as f:
data = json.load(f)
raw_actions = data.get("actions") if isinstance(data, dict) else None
if not isinstance(raw_actions, list) or not raw_actions:
raise ValueError("No actions array found.")
actions = []
for index, item in enumerate(raw_actions):
action_number = index + 1
if not isinstance(item, dict):
raise ValueError(f"Action {action_number} must be an object.")
label = item.get("label")
if not isinstance(label, str) or not label.strip():
raise ValueError(f"Action {action_number} must include a label.")
start_frame = timestamp_to_frame(item.get("start"), action_number, "start")
end_frame = timestamp_to_frame(item.get("end"), action_number, "end")
if end_frame < start_frame:
raise ValueError(f"Invalid action interval in action {action_number}. End must not be before start.")
actions.append({"label": label.strip(), "start_frame": start_frame, "end_frame": end_frame})
return actions
def process_action_interval(cameras, sizes, action, yolo_path, pose_model, Pn_dict, K_dict, dist_dict, connections, progress_cb=None, current_act=0, total_act=1):
cameras_caps = {}
cameras_persons = {}
cameras_balls = {}
cameras_models = {}
for camera in cameras.keys():
cap = cv2.VideoCapture(cameras[camera])
cameras_caps[camera] = cap
cameras_persons[camera] = None
cameras_balls[camera] = None
cameras_models[camera] = YOLO(yolo_path)
first_cap = next(iter(cameras_caps.values()))
infer_size = (int(first_cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(first_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
target_size = sizes['TARGET_SIZE']
yolo_imgsz = sizes['YOLO_IMGSZ']
start_frame = action["start_frame"]
end_frame = action["end_frame"]
for cap in cameras_caps.values():
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
history = {}
total_frames = end_frame - start_frame + 1
for step, frame_no in enumerate(range(start_frame, end_frame + 1)):
if progress_cb:
# Check if cancellation was requested
should_continue = progress_cb(current_act, total_act, step, max(1, total_frames))
if should_continue is False:
raise InterruptedError("Processing cancelled by user")
temp_cameras_frames = {}
cameras_frames = {}
cameras_rets = {}
for camera in cameras:
ret, frame = cameras_caps[camera].read()
temp_cameras_frames[camera] = frame
cameras_rets[camera] = ret
not_working_cameras = [camera for camera in cameras_rets if not cameras_rets[camera]]
if (len(cameras) - len(not_working_cameras)) < 2:
break
for camera in cameras:
if camera not in not_working_cameras:
cameras_frames[camera] = temp_cameras_frames[camera]
for camera in cameras_frames:
result = cameras_models[camera].track(cameras_frames[camera], persist=True, verbose=False, imgsz=yolo_imgsz)[0]
persons, balls = detect_entities(cameras_frames[camera], result, pose_model, target_size)
cameras_persons[camera] = persons
cameras_balls[camera] = balls
cameras_persons, cameras_balls = correspond(cameras_persons, cameras_balls, Pn_dict, K_dict, dist_dict)
persons_3d, balls_3d = stereo(cameras_persons, cameras_balls, Pn_dict, K_dict, dist_dict)
history[frame_no] = {
"persons_3d": copy.deepcopy(persons_3d),
"persons_2d": copy.deepcopy(cameras_persons),
"balls_3d": copy.deepcopy(balls_3d),
"balls_2d": copy.deepcopy(cameras_balls),
}
for cap in cameras_caps.values():
cap.release()
return history
def proximity_target(history, middle):
for i in range(middle-3, middle+4):
frame_data = history.get(i)
if frame_data is None:
continue
if (not frame_data['persons_3d']) or (not frame_data['balls_3d']):
continue
pairs = {}
for ball_id, ball_center in frame_data['balls_3d'].items():
closest_person = None
closest_distance = float('inf')
for person_id, person_skel in frame_data['persons_3d'].items():
person_center = (np.array(person_skel[17]) + np.array(person_skel[18])) / 2
distance = math.dist(person_center, ball_center)
if distance < closest_distance:
closest_distance = distance
closest_person = person_id
if closest_person is not None:
pairs[ball_id] = [closest_person, closest_distance]
global_closest_ball = None
global_closest_person = None
global_closest_distance = float('inf')
for ball_id, closest_details in pairs.items():
if closest_details[1] < global_closest_distance:
global_closest_person = closest_details[0]
global_closest_distance = closest_details[1]
global_closest_ball = ball_id
if global_closest_distance <= 150:
return (global_closest_ball, global_closest_person, global_closest_distance)
return None
def analyze(window, target_player_id, action_type, zone_entry_ball_pos=None):
def kpt(skel, i):
v = skel.get(i)
return np.array(v, float) if v is not None else None
def dist(p1, p2):
if p1 is None or p2 is None: return None
return float(np.linalg.norm(np.array(p1, float) - np.array(p2, float)))
def vec_angle(v1, v2):
v1, v2 = np.array(v1, float), np.array(v2, float)
n1, n2 = np.linalg.norm(v1), np.linalg.norm(v2)
if n1 < 1e-9 or n2 < 1e-9: return None
return round(float(np.degrees(np.arccos(np.clip(np.dot(v1/n1, v2/n2), -1.0, 1.0)))), 3)
def angle3(a, b, c):
if a is None or b is None or c is None: return None
return vec_angle(np.array(a, float)-np.array(b, float), np.array(c, float)-np.array(b, float))
def find_ball(p3d, b3d, pid):
if not b3d: return None, None
skel = p3d.get(pid, {})
# ViTPose 25: l_ankle=17, r_ankle=18
la, ra = kpt(skel, 17), kpt(skel, 18)
pivot = (la+ra)/2.0 if la is not None and ra is not None else None
best_bid, best_pos, best_d = None, None, float('inf')
for bid, bpos in b3d.items():
if bpos is None: continue
bpos = np.array(bpos, float)
d = float(np.linalg.norm(pivot - bpos)) if pivot is not None else 0.0
if d < best_d: best_d, best_bid, best_pos = d, bid, bpos
return best_bid, best_pos
def compute_features(p3d, b3d, pid):
skel = p3d.get(pid)
if skel is None: return None
def g(i): return kpt(skel, i)
# ViTPose 25 keypoint indices:
# 0=nose, 1=l_eye, 2=r_eye, 3=l_ear, 4=r_ear
# 5=neck
# 6=l_shoulder, 7=r_shoulder
# 8=l_elbow, 9=r_elbow
# 10=l_wrist, 11=r_wrist
# 12=l_hip, 13=r_hip
# 15=l_knee, 16=r_knee
# 17=l_ankle, 18=r_ankle
# 19=l_big_toe, 20=l_small_toe, 21=l_heel
# 22=r_big_toe, 23=r_small_toe, 24=r_heel
nose, l_eye, r_eye = g(0), g(1), g(2)
l_shoulder, r_shoulder = g(6), g(7)
l_hip, r_hip = g(12), g(13)
l_knee, r_knee = g(15), g(16)
l_ankle, r_ankle = g(17), g(18)
l_heel, r_heel = g(21), g(24)
l_toe, r_toe = g(19), g(22)
_, ball_center = find_ball(p3d, b3d, pid)
l_knee_angle = angle3(l_hip, l_knee, l_ankle)
r_knee_angle = angle3(r_hip, r_knee, r_ankle)
head_angle = mid_shoulder = mid_hip = None
if all(v is not None for v in (l_shoulder, r_shoulder, l_hip, r_hip)):
mid_shoulder = (l_shoulder + r_shoulder) / 2.0
mid_hip = (l_hip + r_hip) / 2.0
if nose is not None:
head_angle = angle3(nose, mid_shoulder, mid_hip)
trunc_pitch_angle = trunc_roll_angle = head_pitch_angle = head_roll_angle = None
if mid_shoulder is not None and mid_hip is not None:
Y = np.array([0.0, 1.0, 0.0])
Hv = np.array([l_hip[0], mid_hip[1], l_hip[2]]) - mid_hip
Hvn = np.linalg.norm(Hv) or 1.0
X_ = Hv / Hvn
cross = np.cross(X_, Y)
cn = np.linalg.norm(cross)
if cn < 1e-9: cross, cn = np.array([0.0, 0.0, 1.0]), 1.0
Z_ = cross / cn
R = np.column_stack((X_, Y, Z_))
def loc(pt): return R.T @ (pt - mid_hip)
msl = loc(mid_shoulder)
trunc_pitch_angle = angle3((0, msl[1], msl[2]), (0,0,0), (0,-1,0))
trunc_roll_angle = angle3((msl[0], msl[1], 0), (0,0,0), (0,-1,0))
if nose is not None and l_eye is not None and r_eye is not None:
meil = loc((l_eye + r_eye) / 2.0)
nloc = loc(nose)
head_pitch_angle = angle3((0, msl[1], msl[2]), (0, meil[1], meil[2]), (0, nloc[1], nloc[2]))
head_roll_angle = angle3((msl[0], msl[1], 0), (meil[0], meil[1], 0), (nloc[0], nloc[1], 0))
left_foot_orientation_angle = right_foot_orientation_angle = difference_in_angles = None
if all(v is not None for v in (l_hip, r_hip, l_heel, r_heel, l_toe, r_toe)):
body_dir = r_hip - l_hip
body_fwd = np.array([-body_dir[2], 0.0, body_dir[0]])
bfwd_2d = body_fwd[[0, 2]]
bn = np.linalg.norm(bfwd_2d)
if bn > 1e-6:
bfwd_2d /= bn
for foot_heel, foot_toe, side in ((l_heel, l_toe, "left"), (r_heel, r_toe, "right")):
fv = (foot_toe - foot_heel)[[0, 2]]
fn_ = np.linalg.norm(fv)
if fn_ > 1e-6:
fv /= fn_
ang = float(np.degrees(np.arccos(np.clip(np.dot(fv, bfwd_2d), -1.0, 1.0))))
if side == "left": left_foot_orientation_angle = round(ang, 3)
else: right_foot_orientation_angle = round(ang, 3)
if left_foot_orientation_angle is not None and right_foot_orientation_angle is not None:
difference_in_angles = abs(left_foot_orientation_angle - right_foot_orientation_angle)
l_r_foot_distance = None
if all(v is not None for v in (l_ankle, r_ankle, l_knee)):
akl = dist(l_ankle, l_knee)
if akl and akl > 0:
l_r_foot_distance = round(dist(l_ankle, r_ankle), 3)
l_foot_ball_distance = r_foot_ball_distance = None
if ball_center is not None:
if l_ankle is not None: l_foot_ball_distance = round(float(dist(l_ankle, ball_center)), 3)
if r_ankle is not None: r_foot_ball_distance = round(float(dist(r_ankle, ball_center)), 3)
return {
"head_angle": head_angle, "l_knee_angle": l_knee_angle, "r_knee_angle": r_knee_angle,
"trunc_pitch_angle": trunc_pitch_angle, "trunc_roll_angle": trunc_roll_angle,
"head_pitch_angle": head_pitch_angle, "head_roll_angle": head_roll_angle,
"left_foot_orientation_angle": left_foot_orientation_angle,
"right_foot_orientation_angle": right_foot_orientation_angle,
"difference_in_angles": difference_in_angles,
"l_r_foot_distance": l_r_foot_distance,
"l_foot_ball_distance": l_foot_ball_distance, "r_foot_ball_distance": r_foot_ball_distance,
"ball_position": ball_center.tolist() if ball_center is not None else None,
"l_ankle": l_ankle, "r_ankle": r_ankle,
}
def body_to_ball_angle(skel, ball_pos):
if ball_pos is None: return None
# ViTPose 25: l_shoulder=6, r_shoulder=7
l_shoulder, r_shoulder = kpt(skel, 6), kpt(skel, 7)
if l_shoulder is None or r_shoulder is None: return None
forward = r_shoulder - l_shoulder
forward[1] = 0.0
# ViTPose 25: l_ankle=17, r_ankle=18
l_ankle, r_ankle = kpt(skel, 17), kpt(skel, 18)
if l_ankle is None or r_ankle is None:
# fallback to heels: l_heel=21, r_heel=24
l_ankle, r_ankle = kpt(skel, 21), kpt(skel, 24)
feet_mid = (l_ankle + r_ankle) / 2.0
to_ball = ball_pos - feet_mid
to_ball[1] = 0.0
return vec_angle(forward, to_ball)
def body_vs_ball(skel, ball_travel_vec):
# ViTPose 25: l_hip=12, r_hip=13
lh, rh = kpt(skel, 12), kpt(skel, 13)
if lh is None or rh is None: return None
body_dir = rh - lh
forward = np.array([-body_dir[2], 0.0, body_dir[0]])
bv = np.array(ball_travel_vec, float).copy()
bv[1] = 0.0
return vec_angle(forward, bv)
def leg_separation_angle(skel, active_foot):
# ViTPose 25: l_heel=21, r_heel=24, l_hip=12, r_hip=13
l_heel, r_heel = kpt(skel, 21), kpt(skel, 24)
l_hip, r_hip = kpt(skel, 12), kpt(skel, 13)
if any(x is None for x in (l_heel, r_heel, l_hip, r_hip)): return None
if active_foot == "Left": return vec_angle(l_hip - l_heel, r_hip - r_heel)
return vec_angle(r_hip - r_heel, l_hip - l_heel)
def elbow_angle(skel, side):
# ViTPose 25: l_shoulder=6, l_elbow=8, l_wrist=10
# r_shoulder=7, r_elbow=9, r_wrist=11
if side == 'left':
shoulder, elbow, wrist = kpt(skel, 6), kpt(skel, 8), kpt(skel, 10)
else:
shoulder, elbow, wrist = kpt(skel, 7), kpt(skel, 9), kpt(skel, 11)
if any(x is None for x in (shoulder, elbow, wrist)): return None
return angle3(shoulder, elbow, wrist)
BALL_RADIUS_CM = 11
sorted_frames = sorted(window.keys())
n = len(sorted_frames)
zone_entry_ball_pos = np.array(zone_entry_ball_pos, float) if zone_entry_ball_pos is not None else None
if zone_entry_ball_pos is None:
for fn in sorted_frames:
fd = window[fn]
_, bpos = find_ball(fd.get("persons_3d", {}), fd.get("balls_3d", {}), target_player_id)
if bpos is not None: zone_entry_ball_pos = bpos; break
if action_type == "Dribble":
results = []
left_knee_angles, right_knee_angles, torso_pitch_angles, head_angles = [], [], [], []
mid_foot_ball_distances, left_right_foot_distances = [], []
for fn in sorted_frames:
fd = window[fn]
p3d, b3d = fd.get("persons_3d", {}), fd.get("balls_3d", {})
skel = p3d.get(target_player_id, {})
_, ball_pos = find_ball(p3d, b3d, target_player_id)
feat = compute_features(p3d, b3d, target_player_id) or {}
# ViTPose 25: l_ankle=17, r_ankle=18
l_ankle, r_ankle = kpt(skel, 17), kpt(skel, 18)
feet_mid = (l_ankle + r_ankle) / 2.0 if l_ankle is not None and r_ankle is not None else None
ball_feet_dist = round(float(np.linalg.norm(ball_pos - feet_mid)), 3) if ball_pos is not None and feet_mid is not None else None
results.append({"frame": fn, "ball_feet_distance": ball_feet_dist,
"trunk_pitch": feat.get("trunc_pitch_angle"), "trunk_roll": feat.get("trunc_roll_angle"),
"head_angle": feat.get("head_angle"),
"left_elbow_angle": elbow_angle(skel, 'left'), "right_elbow_angle": elbow_angle(skel, 'right')})
left_knee_angles.append(feat.get("l_knee_angle"))
right_knee_angles.append(feat.get("r_knee_angle"))
torso_pitch_angles.append(feat.get("trunc_pitch_angle"))
head_angles.append(feat.get("head_angle"))
lfd, rfd = feat.get("l_foot_ball_distance"), feat.get("r_foot_ball_distance")
mid_foot_ball_distances.append(round((lfd+rfd)/2.0, 3) if lfd is not None and rfd is not None else None)
left_right_foot_distances.append(feat.get("l_r_foot_distance"))
return {"frames": results, "left_knee_angles": left_knee_angles,
"right_knee_angles": right_knee_angles, "torso_pitch_angles": torso_pitch_angles,
"head_angles": head_angles, "mid_foot_ball_distances": mid_foot_ball_distances,
"left_right_foot_distances": left_right_foot_distances}
touch_idx = (n - 1) // 2
touch_frame_no = sorted_frames[touch_idx]
_touch_fd_tmp = window[sorted_frames[touch_idx]]
_, _action_ball_pos = find_ball(_touch_fd_tmp.get("persons_3d", {}), _touch_fd_tmp.get("balls_3d", {}), target_player_id)
fixed_ball_travel_vec = (_action_ball_pos - zone_entry_ball_pos) if zone_entry_ball_pos is not None and _action_ball_pos is not None else None
first_skel = window[sorted_frames[0]].get("persons_3d", {}).get(target_player_id, {})
contact_skel = window[touch_frame_no].get("persons_3d", {}).get(target_player_id, {})
# ViTPose 25: l_ankle=17, r_ankle=18
fl, fr = kpt(first_skel, 17), kpt(first_skel, 18)
cl, cr = kpt(contact_skel, 17), kpt(contact_skel, 18)
active_foot = "Left"
if all(v is not None for v in (fl, fr, cl, cr)):
active_foot = "Left" if (dist(cl, fl) or 0.0) > (dist(cr, fr) or 0.0) else "Right"
pre_start_idx = 0
pre_entries = []
left_knee_angles, right_knee_angles, torso_pitch_angles, head_angles = [], [], [], []
mid_foot_ball_distances, left_right_foot_distances = [], []
is_pass_like = action_type in ("Pass", "Shoot")
for fi in range(pre_start_idx, touch_idx):
fn = sorted_frames[fi]
fd = window[fn]
p3d, b3d = fd.get("persons_3d", {}), fd.get("balls_3d", {})
feat = compute_features(p3d, b3d, target_player_id) or {}
_, ball_pos = find_ball(p3d, b3d, target_player_id)
skel = p3d.get(target_player_id, {})
if is_pass_like:
pre_entries.append({"frame": fn, "body_to_ball_angle": body_to_ball_angle(skel, ball_pos)})
else:
bov = body_vs_ball(skel, fixed_ball_travel_vec) if fixed_ball_travel_vec is not None else None
pre_entries.append({"frame": fn, "body_orientation_vs_ball": bov, "head_angle": feat.get("head_angle")})
left_knee_angles.append(feat.get("l_knee_angle"))
right_knee_angles.append(feat.get("r_knee_angle"))
torso_pitch_angles.append(feat.get("trunc_pitch_angle"))
head_angles.append(feat.get("head_angle"))
lfd, rfd = feat.get("l_foot_ball_distance"), feat.get("r_foot_ball_distance")
mid_foot_ball_distances.append(round((lfd+rfd)/2.0, 3) if lfd is not None and rfd is not None else None)
left_right_foot_distances.append(feat.get("l_r_foot_distance"))
touch_fd = window[touch_frame_no]
touch_p3d, touch_b3d = touch_fd.get("persons_3d", {}), touch_fd.get("balls_3d", {})
touch_skel = touch_p3d.get(target_player_id, {})
_, touch_ball = find_ball(touch_p3d, touch_b3d, target_player_id)
touch_feat = compute_features(touch_p3d, touch_b3d, target_player_id) or {}
action_body_ball_angle = body_to_ball_angle(touch_skel, touch_ball)
left_knee_angles.append(touch_feat.get("l_knee_angle"))
right_knee_angles.append(touch_feat.get("r_knee_angle"))
torso_pitch_angles.append(touch_feat.get("trunc_pitch_angle"))
head_angles.append(touch_feat.get("head_angle"))
lfd_a, rfd_a = touch_feat.get("l_foot_ball_distance"), touch_feat.get("r_foot_ball_distance")
mid_foot_ball_distances.append(round((lfd_a+rfd_a)/2.0, 3) if lfd_a is not None and rfd_a is not None else None)
left_right_foot_distances.append(touch_feat.get("l_r_foot_distance"))
if is_pass_like:
action_data = {
"body_to_ball_angle": action_body_ball_angle,
"l_r_foot_distance": touch_feat.get("l_r_foot_distance"),
"trunc_pitch_angle": touch_feat.get("trunc_pitch_angle"),
"trunc_roll_angle": touch_feat.get("trunc_roll_angle"),
"left_foot_orientation_angle": touch_feat.get("left_foot_orientation_angle"),
"right_foot_orientation_angle": touch_feat.get("right_foot_orientation_angle"),
"difference_in_angles": touch_feat.get("difference_in_angles"),
"l_knee_angle": touch_feat.get("l_knee_angle"),
"r_knee_angle": touch_feat.get("r_knee_angle"),
"head_angle": touch_feat.get("head_angle"),
"head_pitch_angle": touch_feat.get("head_pitch_angle"),
"head_roll_angle": touch_feat.get("head_roll_angle"),
"active_foot": active_foot,
}
else:
bov_vals = [e["body_orientation_vs_ball"] for e in pre_entries if e.get("body_orientation_vs_ball") is not None]
action_data = {
"head_angle": touch_feat.get("head_angle"),
"l_knee_angle": touch_feat.get("l_knee_angle"),
"r_knee_angle": touch_feat.get("r_knee_angle"),
"trunc_pitch_angle": touch_feat.get("trunc_pitch_angle"),
"trunc_roll_angle": touch_feat.get("trunc_roll_angle"),
"left_foot_orientation_angle": touch_feat.get("left_foot_orientation_angle"),
"right_foot_orientation_angle": touch_feat.get("right_foot_orientation_angle"),
"difference_in_angles": touch_feat.get("difference_in_angles"),
"l_r_foot_distance": touch_feat.get("l_r_foot_distance"),
"avg_body_orientation_vs_ball": round(sum(bov_vals)/len(bov_vals), 3) if bov_vals else None,
}
if touch_skel and touch_ball is not None and zone_entry_ball_pos is not None:
btv = touch_ball - zone_entry_ball_pos
action_data["body_orientation_vs_ball"] = body_vs_ball(touch_skel, btv)
if touch_skel and touch_ball is not None:
# ViTPose 25: l_heel=21, r_heel=24
heel = kpt(touch_skel, 21 if active_foot == "Left" else 24)
if heel is not None:
action_data["active_foot_height_pct"] = round(float(heel[1] - touch_ball[1]) / BALL_RADIUS_CM * 100.0, 3)
backward_weighted_angle = forward_weighted_angle = None
if is_pass_like:
bwd_angles = [a for fi in range(pre_start_idx, touch_idx+1)
if (a := leg_separation_angle(window[sorted_frames[fi]].get("persons_3d", {}).get(target_player_id, {}), active_foot)) is not None]
if bwd_angles: backward_weighted_angle = max(bwd_angles)
fwd_angles = [a for fi in range(touch_idx+1, n)
if (a := leg_separation_angle(window[sorted_frames[fi]].get("persons_3d", {}).get(target_player_id, {}), active_foot)) is not None]
if fwd_angles: forward_weighted_angle = max(fwd_angles)
post_buffer = []
for fi in range(touch_idx+1, n):
fn = sorted_frames[fi]
fd = window[fn]
p3d, b3d = fd.get("persons_3d", {}), fd.get("balls_3d", {})
feat = compute_features(p3d, b3d, target_player_id) or {}
if is_pass_like:
_, ball_pos = find_ball(p3d, b3d, target_player_id)
skel = p3d.get(target_player_id, {})
post_buffer.append({"frame": fn, "head_angle": feat.get("head_angle"), "body_to_ball_angle": body_to_ball_angle(skel, ball_pos)})
else:
ld, rd = feat.get("l_foot_ball_distance") or 0.0, feat.get("r_foot_ball_distance") or 0.0
post_buffer.append({"frame": fn, "mid_feet_ball_dist": round((ld+rd)/2.0, 3)})
left_knee_angles.append(feat.get("l_knee_angle"))
right_knee_angles.append(feat.get("r_knee_angle"))
torso_pitch_angles.append(feat.get("trunc_pitch_angle"))
head_angles.append(feat.get("head_angle"))
lfd_p, rfd_p = feat.get("l_foot_ball_distance"), feat.get("r_foot_ball_distance")
mid_foot_ball_distances.append(round((lfd_p+rfd_p)/2.0, 3) if lfd_p is not None and rfd_p is not None else None)
left_right_foot_distances.append(feat.get("l_r_foot_distance"))
report = {
"active_foot": active_foot, "touch_frame": touch_frame_no,
"pre_action": pre_entries, "action_frame": action_data, "post_action": post_buffer,
"left_knee_angles": left_knee_angles, "right_knee_angles": right_knee_angles,
"torso_pitch_angles": torso_pitch_angles, "head_angles": head_angles,
"mid_foot_ball_distances": mid_foot_ball_distances, "left_right_foot_distances": left_right_foot_distances,
}
if is_pass_like:
report["backward_weighted_angle"] = backward_weighted_angle
report["forward_weighted_angle"] = forward_weighted_angle
return report
def run_pipeline(cameras, utils_paths, sizes, progress_cb=None):
if len(cameras) < 2:
raise ValueError("Insufficient Cameras For 3D-based Analysis")
temp_caps = {}
cameras_fps = {}
cameras_frame_counts = {}
cameras_sizes = {}
for camera in cameras.keys():
temp_caps[camera] = cv2.VideoCapture(cameras[camera])
if not temp_caps[camera].isOpened():
for cap in temp_caps.values():
cap.release()
raise ValueError(f"Camera {camera} video could not be opened by OpenCV")
cameras_fps[camera] = temp_caps[camera].get(cv2.CAP_PROP_FPS)
cameras_frame_counts[camera] = int(temp_caps[camera].get(cv2.CAP_PROP_FRAME_COUNT) or 0)
cameras_sizes[camera] = (
int(temp_caps[camera].get(cv2.CAP_PROP_FRAME_WIDTH) or 0),
int(temp_caps[camera].get(cv2.CAP_PROP_FRAME_HEIGHT) or 0),
)
if any(v <= 0 for v in cameras_fps.values()):
for cap in temp_caps.values():
cap.release()
raise ValueError("One or more uploaded videos has an invalid FPS or could not be decoded")
if any(v <= 0 for v in cameras_frame_counts.values()):
for cap in temp_caps.values():
cap.release()
raise ValueError("One or more uploaded videos has zero readable frames")
if len(set(round(v) for v in cameras_fps.values())) != 1:
for cap in temp_caps.values():
cap.release()
raise ValueError("Not Compatible Frame Rates")
fps = int(round(next(iter(cameras_fps.values()))))
common_frame_count = min(cameras_frame_counts.values())
for cap in temp_caps.values():
cap.release()
pose_model = VitInference(utils_paths['POSE_PATH'], None, model_name=None, dataset='coco_25', device='cpu', is_video=True)
connections = ((17, 15), (15, 12), (18, 16), (16, 13), (12, 14), (13, 14), (6, 5), (7, 5), (6, 8), (7, 9), (8, 10), (9, 11),
(1, 2), (0, 1), (0, 2), (1, 3), (2, 4), (17, 21), (18, 24), (19, 20), (22, 23), (19, 21), (22, 24), (5, 0), (6, 12),
(7, 13), (17, 19), (17, 20), (18, 22), (18, 23), (19, 20), (20, 21), (19, 21), (22, 23), (23, 24), (22, 24))
Pn_dict, K_dict, dist_dict = prepare_cameras(utils_paths['CALIBRATION_PATH'])
actions = load_action_intervals(utils_paths['ACTIONS_PATH'], fps)
reports = []
for i, action in enumerate(actions):
if action["start_frame"] >= common_frame_count:
max_time = max(0.0, (common_frame_count - 1) / fps)
reports.append({
"error": f"Action starts beyond the synchronized video range. Last common frame across all cameras is {common_frame_count - 1} ({max_time:.2f}s).",
"action": action["label"]
})
continue
if action["end_frame"] >= common_frame_count:
max_time = max(0.0, (common_frame_count - 1) / fps)
reports.append({
"error": f"Action ends beyond the synchronized video range. Last common frame across all cameras is {common_frame_count - 1} ({max_time:.2f}s).",
"action": action["label"]
})
continue
history = process_action_interval(cameras, sizes, action, utils_paths['YOLO_PATH'], pose_model, Pn_dict, K_dict, dist_dict, connections, progress_cb, i, len(actions))
if not history:
reports.append({
"error": "No synchronized frames could be read for this interval. Check that all videos are decodable, aligned, and long enough for the selected timestamps.",
"action": action["label"]
})
continue
middle = (action["start_frame"] + action["end_frame"]) // 2
target_pair = proximity_target(history, middle)
if target_pair is None:
reports.append({"error": "No player was detected near the ball in the designated window.", "action": action["label"]})
continue
analytics = analyze(window=history, target_player_id=target_pair[1], action_type=action["label"])
if analytics is None:
reports.append({"error": "Insufficient skeletal keypoints to compute biomechanical metrics.", "action": action["label"]})
continue
skel_history = {frame_no: frame_data["persons_3d"].get(target_pair[1]) for frame_no, frame_data in history.items() if target_pair[1] in frame_data.get("persons_3d", {})}
ball_history = {frame_no: frame_data["balls_3d"].get(target_pair[0]) for frame_no, frame_data in history.items() if target_pair[0] in frame_data.get("balls_3d", {})}
reports.append({
"action": action["label"],
"start_frame": action["start_frame"],
"end_frame": action["end_frame"],
"fps": fps,
"target_player": target_pair[1],
"skel_history": skel_history,
"ball_history": ball_history,
"analytics": analytics,
})
return reports