""" Front-Facing Golf Swing Metrics Calculator This module contains all the mathematical calculations for front-facing golf swing metrics. The metrics_calculator.py file handles DTL (Down-the-Line) view metrics. """ import math import numpy as np import cv2 from typing import Dict, List, Tuple, Optional, Union # Landmark indices L_HIP, R_HIP = 23, 24 L_SHO, R_SHO = 11, 12 L_EYE, R_EYE = 2, 5 # Debug helper - set to True to see detailed metric calculations VERBOSE = False def set_debug(on: bool = True): global VERBOSE VERBOSE = bool(on) def dbg(msg): if VERBOSE: print(f"DEBUG: {msg}") # Version tag for tracking code changes METRICS_VERSION = "front-ff v2025-09-26-added-hip-shoulder-separation-4-metrics" def _best_foot_pair(lm, want_world=False): """ Return a robust (left,right) foot pair tuple for TOES/FOOT-INDEX with highest availability. In MediaPipe Pose: 29 = L_HEEL, 30 = R_HEEL, 31 = L_FOOT_INDEX (toe tip), 32 = R_FOOT_INDEX (toe tip) We prefer (31,32). If missing, fall back to (29,30). """ if lm is None: return None # World landmarks have 3D tuples; image landmarks have [x,y,vis] def ok(pt): if not pt: return False if want_world: return len(pt) >= 3 and all(np.isfinite(pt[:3])) else: return len(pt) >= 3 and pt[2] >= 0.4 and np.isfinite(pt[0]) and np.isfinite(pt[1]) # Prefer FOOT_INDEX (toes) l_idx, r_idx = 31, 32 if (len(lm) > r_idx) and ok(lm[l_idx]) and ok(lm[r_idx]): return (l_idx, r_idx) # Fall back to HEEL l_idx, r_idx = 29, 30 if (len(lm) > r_idx) and ok(lm[l_idx]) and ok(lm[r_idx]): return (l_idx, r_idx) return None def _toe_line_unit_img(lm, H, W, roll_deg): """ 2D: return unit toe-line vector in the de-rolled image frame. Uses best available (31,32) or (29,30). Returns None if unavailable. """ pair = _best_foot_pair(lm, want_world=False) if not pair: return None iL, iR = pair dx = (lm[iR][0] - lm[iL][0]) * (W if max(abs(lm[iR][0]-lm[iL][0]), abs(lm[iR][1]-lm[iL][1])) <= 2.0 else 1.0) dy = (lm[iR][1] - lm[iL][1]) * (H if max(abs(lm[iR][0]-lm[iL][0]), abs(lm[iR][1]-lm[iL][1])) <= 2.0 else 1.0) cr, sr = math.cos(math.radians(roll_deg)), math.sin(math.radians(roll_deg)) tx, ty = (cr*dx + sr*dy, -sr*dx + cr*dy) # de-rolled n = math.hypot(tx, ty) if n < 1e-6: return None return (tx/n, ty/n) def _toe_line_unit_world(wf): """ 3D/XZ: return unit toe-line vector (R-L) in XZ. Prefers (31,32), else (29,30). """ pair = _best_foot_pair(wf, want_world=True) if not pair: return None iL, iR = pair vx = wf[iR][0] - wf[iL][0] vz = wf[iR][2] - wf[iL][2] n = math.hypot(vx, vz) if n < 1e-6: return None return (vx/n, vz/n) # Core math utilities def wrap180(a): return (a + 180.0) % 360.0 - 180.0 def unit2(v): """Normalize 2D vector""" n = (v[0]*v[0] + v[1]*v[1])**0.5 return (v[0]/n, v[1]/n) if n > 1e-6 else (0.0, 1.0) def signed_angle2(u, v): """Signed angle between two 2D vectors in degrees (-180, 180]""" dot = u[0]*v[0] + u[1]*v[1] det = u[0]*v[1] - u[1]*v[0] return math.degrees(math.atan2(det, dot)) def _eye_roll_deg(lx, ly, rx, ry): """Calculate angle of the line from right eye to left eye""" return math.degrees(math.atan2(ly - ry, lx - rx)) def _midpt(a, b): """Calculate midpoint of two 2D points""" return ((a[0] + b[0]) * 0.5, (a[1] + b[1]) * 0.5) def smooth_landmarks(pose_data: Dict[int, List], alpha: float = 0.35) -> Dict[int, List]: """Apply exponential moving average smoothing to landmarks""" if not pose_data: return pose_data frame_indices = sorted(pose_data.keys()) smoothed_data = {} for frame_idx in frame_indices: landmarks = pose_data[frame_idx] if not landmarks or not isinstance(landmarks, list): continue smoothed_landmarks = [] for i, lm in enumerate(landmarks): if lm and isinstance(lm, list) and len(lm) >= 3: x, y, vis = lm[0], lm[1], lm[2] # Get previous smoothed values if frame_idx > 0 and frame_idx - 1 in smoothed_data: prev_lms = smoothed_data[frame_idx - 1] if (i < len(prev_lms) and prev_lms[i] and isinstance(prev_lms[i], list) and len(prev_lms[i]) >= 3): px, py, pv = prev_lms[i][0], prev_lms[i][1], prev_lms[i][2] x = alpha * x + (1 - alpha) * px y = alpha * y + (1 - alpha) * py vis = alpha * vis + (1 - alpha) * pv smoothed_landmarks.append([x, y, vis]) else: smoothed_landmarks.append(lm) smoothed_data[frame_idx] = smoothed_landmarks return smoothed_data def has_world_ok_scale(world_landmarks, frame_idx=None): """Check if world landmarks have reasonable scale (relaxed ranges for hip/shoulder span)""" if not world_landmarks: return False frames_to_check = [frame_idx] if frame_idx is not None else list(world_landmarks.keys())[:5] for f in frames_to_check: if f not in world_landmarks: continue wf = world_landmarks[f] if not wf or len(wf) < 25: continue # Check hip span (relaxed range: 0.20-0.70m) if wf[23] and wf[24]: # L_HIP, R_HIP hip_span = math.hypot(wf[24][0] - wf[23][0], wf[24][2] - wf[23][2]) if 0.20 <= hip_span <= 0.70: dbg(f"[SCALE] Hip span {hip_span:.3f}m is reasonable") return True # Check shoulder span (relaxed range: 0.25-0.70m) if wf[11] and wf[12]: # L_SHO, R_SHO sho_span = math.hypot(wf[12][0] - wf[11][0], wf[12][2] - wf[11][2]) if 0.25 <= sho_span <= 0.70: dbg(f"[SCALE] Shoulder span {sho_span:.3f}m is reasonable") return True # Debug why scale check failed if frames_to_check: f = frames_to_check[0] if f in world_landmarks: wf = world_landmarks[f] if wf and len(wf) >= 25: if wf[23] and wf[24]: hip_span = math.hypot(wf[24][0] - wf[23][0], wf[24][2] - wf[23][2]) dbg(f"[SCALE] Hip span {hip_span:.3f}m out of range [0.20, 0.70]") if wf[11] and wf[12]: sho_span = math.hypot(wf[12][0] - wf[11][0], wf[12][2] - wf[11][2]) dbg(f"[SCALE] Shoulder span {sho_span:.3f}m out of range [0.25, 0.70]") return False def unit3(v): """Normalize 3D vector""" n = math.sqrt(v[0]*v[0] + v[1]*v[1] + v[2]*v[2]) return np.array(v) / n if n > 1e-6 else np.array([0.0, 0.0, 1.0]) def world_vec(world_landmarks, frame_idx, landmark_name): """Get world vector for landmark""" landmark_map = {'C7': 0, 'L_HIP': 23, 'R_HIP': 24} # C7 approximated as nose for now if landmark_name == 'C7': # Approximate C7 as midpoint between shoulders raised slightly if (frame_idx in world_landmarks and len(world_landmarks[frame_idx]) > 12 and world_landmarks[frame_idx][11] and world_landmarks[frame_idx][12]): l_sho = np.array(world_landmarks[frame_idx][11][:3]) r_sho = np.array(world_landmarks[frame_idx][12][:3]) # Raise midpoint slightly to approximate C7 c7_approx = (l_sho + r_sho) / 2 + np.array([0, 0.08, 0]) # 8cm higher return c7_approx idx = landmark_map.get(landmark_name) if idx is None or frame_idx not in world_landmarks: return None wf = world_landmarks[frame_idx] if not wf or len(wf) <= idx or not wf[idx]: return None return np.array(wf[idx][:3]) def any_nan(arr): """Check if array contains any NaN values""" return np.any(np.isnan(arr)) if arr is not None else True def calibrate_sidebend_sign(world_landmarks, impact_idx, handedness, fwd_hat_3d): """ Returns +1 or -1: multiplier to convert 2D dx sign into 'away-from-target = +' Uses short window around impact to vote with 3D sign, falls back to handed rule. """ if not has_world_ok_scale(world_landmarks): return 1 if handedness == 'right' else -1 W = range(max(0, impact_idx-2), min(len(list(world_landmarks.keys())), impact_idx+3)) votes = 0 for i in W: if i not in world_landmarks: continue c7 = world_vec(world_landmarks, i, 'C7') l_hip = world_vec(world_landmarks, i, 'L_HIP') r_hip = world_vec(world_landmarks, i, 'R_HIP') if c7 is None or l_hip is None or r_hip is None: continue hip = (l_hip + r_hip) * 0.5 spine = unit3(c7 - hip) # lateral-right unit on ground plane from forward lat = unit3(np.cross([0,1,0], fwd_hat_3d)) s3d = np.sign(np.dot(spine, lat)) # + means leaning to player's right votes += s3d # majority 3D says which side is player-right; convert to 'away from target = +' # For RH, away-from-target == player-right; for LH, opposite. prefer_player_right_positive = np.sign(votes) if votes != 0 else 1 return prefer_player_right_positive if handedness == 'right' else -prefer_player_right_positive def calculate_torso_sidebend_at_impact(pose_data, swing_phases, frames=None, image_shape=None, handedness='right', world_landmarks=None): """ Front-facing: torso side-bend in degrees at impact. Defined as angle between (neck midpoint → hip midpoint) and screen-vertical, computed in derolled screen space. """ dbg(f"[SB] handed={handedness}") # --- Resolve impact frame (with a tiny +δ tolerance) --- impact_frames = swing_phases.get("impact", []) if impact_frames: impact_idx = int(round(impact_frames[0])) else: # Fallback: find impact using peak hand speed if no swing phases provided impact_idx = _find_impact_frame(pose_data, world_landmarks) if impact_idx is None: impact_idx = sorted(pose_data.keys())[-1] if pose_data else None # Surgical asserts to guarantee true impact frame assert impact_idx is not None, "[SB] impact_idx None (missing world_landmarks pass-thru?)" n = len(pose_data["landmarks"]) if "landmarks" in pose_data else len(pose_data) assert impact_idx < n-1, f"[SB] Using last frame ({impact_idx}/{n-1}) — not true impact" dbg(f"[SB] impact_idx={impact_idx} of {n} frames (OK)") if impact_idx is None: dbg("[TORSO] No impact frame found") return {"metric": "torso_side_bend_deg", "value": None, "quality": "no-data"} num_frames = len(pose_data["landmarks"]) if "landmarks" in pose_data else len(pose_data) # TEMPORAL SMOOTHING: Use ±2-frame window around impact for better timing candidates = [impact_idx - 2, impact_idx - 1, impact_idx, impact_idx + 1, impact_idx + 2] candidates = [i for i in candidates if 0 <= i < num_frames and i in pose_data] # --- Improved de-roll using eyes + hips (not eyes-only) to reduce drift --- lo = max(0, impact_idx - 4) hi = min(num_frames - 1, impact_idx + 5) combined_rolls = [] for i in range(lo, hi + 1): if i not in pose_data: continue L_EYE, R_EYE = 2, 5 L_HIP, R_HIP = 23, 24 lm = pose_data[i] if not lm or len(lm) <= max(R_EYE, R_HIP): continue # Collect both eye and hip roll angles frame_rolls = [] # Eye roll le, re = lm[L_EYE], lm[R_EYE] if le is not None and re is not None: (lx, ly), (rx, ry) = le[:2], re[:2] frame_rolls.append(_eye_roll_deg(lx, ly, rx, ry)) # Hip roll lh, rh = lm[L_HIP], lm[R_HIP] if lh is not None and rh is not None: (lx, ly), (rx, ry) = lh[:2], rh[:2] frame_rolls.append(_eye_roll_deg(lx, ly, rx, ry)) # Use median of available rolls for this frame if frame_rolls: combined_rolls.append(np.median(frame_rolls)) roll_deg = 0.0 if combined_rolls: arr = np.array(combined_rolls, dtype=float) lo_q, hi_q = np.percentile(arr, [15, 85]) arr = arr[(arr >= lo_q) & (arr <= hi_q)] roll_deg = float(np.median(np.clip(arr, -25.0, 25.0))) dbg(f"[TORSO] eyes+hips roll (trimmed median) = {roll_deg:.1f}°") # --- Choose best candidate (prefer middle of temporal window) --- valid_candidates = [] L_SHO, R_SHO, L_HIP, R_HIP = 11, 12, 23, 24 for i in candidates: lm = pose_data[i] if not lm or len(lm) <= max(L_SHO, R_SHO, L_HIP, R_HIP): continue ls = lm[L_SHO] rs = lm[R_SHO] lh = lm[L_HIP] rh = lm[R_HIP] if None not in (ls, rs, lh, rh): valid_candidates.append(i) if not valid_candidates: dbg("[TORSO] No valid landmarks around impact") return {"metric": "torso_side_bend_deg", "value": None, "quality": "no-data"} # TEMPORAL MEDIAN: Calculate side-bend from all valid frames for stability def _get_HW(): if frames: H, W = frames[0].shape[:2]; return float(H), float(W) if image_shape and len(image_shape) >= 2: H, W = image_shape[:2]; return float(H), float(W) return None, None H, W = _get_HW() cx, cy = (W / 2.0 if W else 0.0), (H / 2.0 if H else 0.0) rad = math.radians(-roll_deg) cr, sr = math.cos(rad), math.sin(rad) def _deroll(pt): x, y = pt[:2] # Handle both [x,y] and [x,y,vis] formats x0, y0 = x - cx, y - cy xr = cr * x0 - sr * y0 + cx yr = sr * x0 + cr * y0 + cy return (xr, yr) # Calculate side-bend for all valid frames and take median L_SHO, R_SHO, L_HIP, R_HIP = 11, 12, 23, 24 sidebend_values = [] for frame_idx in valid_candidates: lm = pose_data[frame_idx] ls = _deroll(lm[L_SHO]) rs = _deroll(lm[R_SHO]) lh = _deroll(lm[L_HIP]) rh = _deroll(lm[R_HIP]) neck = _midpt(ls, rs) midhip = _midpt(lh, rh) dx = neck[0] - midhip[0] dy = neck[1] - midhip[1] # y grows downward # 2D frontal-plane magnitude only dy = abs(dy) + 1e-6 # Ensure positive denominator sidebend_abs_frame = math.degrees(math.atan2(abs(dx), dy)) sidebend_values.append((dx, sidebend_abs_frame)) # Store both dx and magnitude # Use median magnitude and median dx sign if sidebend_values: dx_values = [sv[0] for sv in sidebend_values] abs_values = [sv[1] for sv in sidebend_values] dx = np.median(dx_values) # Median dx for sign sidebend_abs = np.median(abs_values) # Median magnitude chosen = valid_candidates[len(valid_candidates)//2] # For debugging reference else: return {"metric": "torso_side_bend_deg", "value": None, "quality": "no-data"} # Get forward direction for sign calibration fwd_hat_3d = None if world_landmarks is not None: setup_frames = swing_phases.get("setup", [])[:5] or sorted(world_landmarks.keys())[:5] for f in setup_frames: if f in world_landmarks: toe_vec_3d = _toe_line_unit_world(world_landmarks[f]) if toe_vec_3d is not None: # Forward perpendicular to toe line in XZ plane fwd_hat_3d = np.array([-toe_vec_3d[1], 0, toe_vec_3d[0]]) break # Standardized sign convention: positive toward target for right-handed sign_2d_raw = 1.0 if dx > 0 else -1.0 cal = calibrate_sidebend_sign(world_landmarks, impact_idx, handedness, fwd_hat_3d) # Apply standardized sign convention: positive = away from target # For right-handed: away from target = toward player's right # For left-handed: away from target = toward player's left final_sign = sign_2d_raw * cal sidebend_deg = final_sign * sidebend_abs # Guardrails: clip extreme values but warn if abs(sidebend_deg) > 30: dbg(f"[SB] rejecting |sb|={abs(sidebend_deg):.1f}° > 30°, check roll/coords") sidebend_deg = math.copysign(30.0, sidebend_deg) # Debug output matching expected format dbg(f"[SB] 2D mag={sidebend_abs:.1f}°, cal={cal:+.0f}, final={sidebend_deg:.1f}°") dbg(f"[TORSO] side-bend = {sidebend_deg:.1f}° (dx={dx:.1f}, dy={abs(dy):.1f}, abs={sidebend_abs:.1f}°)") # Optional: shoulder drop % of head height (kept because it's useful) head_h = pose_data.get("head_h", {}).get(chosen) drop_pct = None if head_h and head_h > 0: # lead vs trail by handedness; y increases downward in image coords lead_y = ls[1] if handedness == 'right' else rs[1] trail_y = rs[1] if handedness == 'right' else ls[1] dy_sho = (lead_y - trail_y) drop_pct = 100.0 * (dy_sho / head_h) dbg(f"[TORSO] lead-shoulder drop = {drop_pct:.1f}% head") # Trust but verify debug printout dbg(f"[SB] final torso_sidebend={sidebend_deg:.1f}° at frame={chosen}") return { "metric": "torso_side_bend_deg", "value": sidebend_deg, "quality": "ok", "details": { "frame": chosen, "roll_used_deg": roll_deg, "shoulder_drop_pct_head": drop_pct } } def calculate_shoulder_tilt_at_impact(*args, **kwargs): """ DEPRECATED: Shoulder tilt removed due to occlusion instability. Returns torso side-bend instead for backward compatibility. """ dbg("[DEPRECATION] Shoulder tilt removed. Returning torso side-bend instead.") res = calculate_torso_sidebend_at_impact(*args, **kwargs) # keep old key for downstream consumers if needed: return res["value"] # Return just the numeric value for compatibility def _find_impact_frame(pose_data, world_landmarks=None): """Find impact frame using peak hand speed if not provided in swing phases""" if not world_landmarks: return None frames = sorted(world_landmarks.keys()) if len(frames) < 3: return None speeds = [] for i in range(1, len(frames)): f_curr = frames[i] f_prev = frames[i-1] curr_frame = world_landmarks.get(f_curr) prev_frame = world_landmarks.get(f_prev) if (curr_frame and len(curr_frame) > 16 and prev_frame and len(prev_frame) > 16 and curr_frame[16] and prev_frame[16]): # Right wrist curr_wrist = curr_frame[16] prev_wrist = prev_frame[16] # Calculate 3D speed dx = curr_wrist[0] - prev_wrist[0] dy = curr_wrist[1] - prev_wrist[1] dz = curr_wrist[2] - prev_wrist[2] speed = math.sqrt(dx*dx + dy*dy + dz*dz) speeds.append((f_curr, speed)) if speeds: # Find peak speed frame peak_frame = max(speeds, key=lambda x: x[1])[0] # Impact is typically 1 frame before peak speed impact_frame = peak_frame - 1 return impact_frame if impact_frame in frames else peak_frame return None def _target_hat_from_toes_world(world_landmarks, setup_frames, handedness='right'): """Build stable target axis from toe line at address with outlier trimming""" LEFT_TOE, RIGHT_TOE = 31, 32 toe_vecs = [] for f in setup_frames: if f in world_landmarks: frame = world_landmarks[f] if frame and len(frame) > 32: try: Ltoe = frame[LEFT_TOE] Rtoe = frame[RIGHT_TOE] if Ltoe and Rtoe and len(Ltoe) >= 3 and len(Rtoe) >= 3: toe_vec = (Rtoe[0]-Ltoe[0], Rtoe[2]-Ltoe[2]) if (toe_vec[0]**2 + toe_vec[1]**2) > 1e-6: toe_vecs.append(toe_vec) except Exception: continue if not toe_vecs: dbg("target_axis - NO VALID TOE VECTORS, using default") return (1.0, 0.0) # Drop 20% widest-angle outliers before median toe_vecs = np.array(toe_vecs, dtype=float) angles = np.degrees(np.arctan2(toe_vecs[:,1], toe_vecs[:,0])) med, dev = np.median(angles), np.abs(angles - np.median(angles)) keep = dev <= np.percentile(dev, 80) target_hat = unit2(np.median(toe_vecs[keep], axis=0)) # Optional: flip sign for consistency if you want +X toward target for left-handed if handedness == 'left': target_hat = (-target_hat[0], -target_hat[1]) return target_hat def _pick_stable_setup_frames(sm, swing_phases, max_frames=6): """Pick first address frames with low pelvis motion (reduces waggle bias).""" cand = swing_phases.get("setup", [])[:12] if not cand: return [] def pelvis_ctr_px(f): lm = sm.get(f) if not lm or not lm[23] or not lm[24]: return None return np.array([(lm[23][0]+lm[24][0])*0.5, (lm[23][1]+lm[24][1])*0.5], float) centers = [(f, pelvis_ctr_px(f)) for f in cand] centers = [(f, c) for f,c in centers if c is not None] if len(centers) < 2: return [f for f,_ in centers][:max_frames] # speed threshold: keep earliest frames whose step-to-step pelvis motion is small speeds = [] for i in range(1, len(centers)): f0,c0 = centers[i-1]; f1,c1 = centers[i] speeds.append((f1, float(np.linalg.norm(c1-c0)))) if not speeds: return [centers[0][0]] # pick prefix until motion inflates (waggle begins) thr = max(1.5, np.median([s for _,s in speeds])*2.5) keep = [centers[0][0]] for f,s in speeds: if s <= thr and len(keep) < max_frames: keep.append(f) else: break return keep def _build_foot_only_homography(sm, setup_frames): """ Build ground-plane H from 4 foot points only (no hips). MediaPipe indices: 27=L_heel, 28=R_heel, 29=L_ankle, 30=R_ankle, 31=L_foot_index, 32=R_foot_index Image pts use (31,32) for toes or (29,30) for ankles as fallback. Ground pts are a rectangle in arbitrary inches; scale will be calibrated later. """ import cv2 for f in setup_frames: lm = sm.get(f) if not lm: continue needed = [29,30,31,32] if any((i>=len(lm) or lm[i] is None or lm[i][2] < 0.5) for i in needed): continue # Image points - use consistent ankle/toe mapping img = np.array([ [lm[29][0], lm[29][1]], # L_ankle -> treat as heel [lm[30][0], lm[30][1]], # R_ankle -> heel [lm[31][0], lm[31][1]], # L_foot_index -> toe [lm[32][0], lm[32][1]], # R_foot_index -> toe ], dtype=np.float32) # Ground rectangle (left/right must match the image ordering above) ground = np.array([ [-8.0, 0.0], # L heel [ 8.0, 0.0], # R heel [-8.0, 11.0], # L toe [ 8.0, 11.0], # R toe ], dtype=np.float32) H, mask = cv2.findHomography(img, ground, cv2.RANSAC, 2.0) if H is None: continue det = abs(np.linalg.det(H[:2,:2])) if det > 1e-5: return H, True, f return None, False, None def _simple_pixel_sway_fallback(sm, setup_frames, top_frame, swing_phases=None, world_landmarks=None): """Improved pixel-based sway fallback with world landmark scale calibration""" def get_lm(frame_idx): return sm.get(frame_idx) # pick a setup frame that has feet + hips setup_ok = [f for f in setup_frames if get_lm(f) and get_lm(f)[23] and get_lm(f)[24] and get_lm(f)[31] and get_lm(f)[32]] if not setup_ok: return 0.0 f0 = setup_ok[0] lm0 = get_lm(f0) # pelvis centers (pixels) def pelvis_ctr_px(f): lm = get_lm(f) if not lm or not lm[23] or not lm[24]: return None return np.array([(lm[23][0] + lm[24][0]) * 0.5, (lm[23][1] + lm[24][1]) * 0.5], dtype=float) setup_ctrs = [pelvis_ctr_px(f) for f in setup_ok] setup_ctrs = [c for c in setup_ctrs if c is not None] if not setup_ctrs: return 0.0 setup_ctr = np.median(np.stack(setup_ctrs), axis=0) top_ctr = pelvis_ctr_px(top_frame) if top_ctr is None: return 0.0 # toe vector in pixels from the same setup frame toe_v = np.array([lm0[32][0] - lm0[31][0], lm0[32][1] - lm0[31][1]], dtype=float) n = np.linalg.norm(toe_v) if n < 1e-6: # fallback to horizontal approximation if toes are missing toe_hat_px = np.array([1.0, 0.0]) else: toe_hat_px = toe_v / n # Target axis (toe line itself) in pixel space target_hat_px = toe_hat_px / (np.linalg.norm(toe_hat_px) + 1e-9) # --- NEW: sign disambiguation using address -> impact pelvis centers in pixel space --- if swing_phases and swing_phases.get("impact"): f_imp = swing_phases["impact"][0] # median address pelvis center in px addr_ctrs = [pelvis_ctr_px(f) for f in setup_ok] addr_ctrs = [c for c in addr_ctrs if c is not None] imp_ctr = pelvis_ctr_px(f_imp) if addr_ctrs and imp_ctr is not None: addr_ctr = np.median(np.stack(addr_ctrs), axis=0) fwd_vec_px = imp_ctr - addr_ctr if float(np.dot(fwd_vec_px, target_hat_px)) < 0.0: dbg("[SWAY] Flipped target_hat_px based on addr→impact motion") target_hat_px = -target_hat_px delta_px = top_ctr - setup_ctr sway_px = float(np.dot(delta_px, target_hat_px)) # Improved px→inch scale using world landmarks with consistency check px_per_inch_candidates = [] default_px_per_inch = 10.0 if world_landmarks is not None: # Try to get px_per_inch from multiple setup frames for f in setup_ok: if f in world_landmarks: try: wf = world_landmarks[f] lm = get_lm(f) L_FOOT_INDEX, R_FOOT_INDEX = 31, 32 # foot_index points (toe tips) Lt, Rt = wf[L_FOOT_INDEX], wf[R_FOOT_INDEX] if Lt and Rt and lm and lm[31] and lm[32]: # 3D toe spacing in meters → inches d_m = math.hypot(Rt[0]-Lt[0], Rt[2]-Lt[2]) d_in_true = d_m * 39.3701 # Pixel toe spacing for this frame toe_v_frame = np.array([lm[32][0] - lm[31][0], lm[32][1] - lm[31][1]], dtype=float) d_px = float(np.linalg.norm(toe_v_frame)) if d_px > 1e-3 and d_in_true > 1e-3: candidate = d_px / d_in_true # Sanity check: reasonable range for px_per_inch (5-25 depending on resolution/crop) if 5.0 <= candidate <= 25.0: px_per_inch_candidates.append(candidate) dbg(f"pixel_fallback: frame {f} px_per_inch={candidate:.2f}") except Exception: continue # Apply consistency check: reject frames with >10% deviation from median if len(px_per_inch_candidates) > 1: initial_median = np.median(px_per_inch_candidates) consistent_candidates = [] for val in px_per_inch_candidates: deviation_pct = abs(val - initial_median) / initial_median * 100 if deviation_pct <= 10.0: # Within 10% of median consistent_candidates.append(val) else: dbg(f"pixel_fallback: rejecting inconsistent px_per_inch={val:.2f} (deviation={deviation_pct:.1f}%)") px_per_inch_candidates = consistent_candidates # Use median of candidates if available, otherwise use default if px_per_inch_candidates: px_per_inch = float(np.median(px_per_inch_candidates)) dbg(f"pixel_fallback: median px_per_inch={px_per_inch:.2f} from {len(px_per_inch_candidates)} consistent frames") else: px_per_inch = default_px_per_inch dbg(f"pixel_fallback: using default px_per_inch={px_per_inch:.2f}") sway_inches = sway_px / px_per_inch # Apply same robust baseline approach to pixel fallback if len(setup_ctrs) >= 2: setup_x_px = [c[0] for c in setup_ctrs] t = np.arange(len(setup_x_px)) m, b = np.polyfit(t, setup_x_px, 1) if len(setup_x_px) >= 2 else (0, setup_x_px[0]) residuals = np.array(setup_x_px) - (m * t + b) mad_residuals = np.median(np.abs(residuals - np.median(residuals))) jitter_px = 1.4826 * mad_residuals jitter_in = jitter_px / px_per_inch else: jitter_in = 0.1 # Minimal fallback # Calculate trust ratio sway_jitter_ratio = abs(sway_inches) / max(jitter_in, 1e-6) trust_flag = "trusted" if sway_jitter_ratio >= 1.5 else "low-trust" # Normalize by stance width if using pixel fallback and world landmarks available normalized_sway = None stance_width_in = None if world_landmarks: stance_widths = [] for f in setup_ok: if f in world_landmarks: try: Lt_w, Rt_w = world_landmarks[f][31], world_landmarks[f][32] toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3])) stance_widths.append(float(toe_world_m * 39.3701)) # Convert to inches except: pass if stance_widths: stance_width_in = np.median(stance_widths) if 14 <= stance_width_in <= 28: normalized_sway = sway_inches / stance_width_in # Enhanced pixel fallback logging matching main function norm_str = f"{normalized_sway:.3f}" if normalized_sway is not None else "NA" dbg(f"pixel_fallback: dx={sway_inches:.2f}in, norm={norm_str}, jitter={jitter_in:.2f}in, ratio={sway_jitter_ratio:.1f}, trust={trust_flag}") if stance_width_in: dbg(f"pixel_fallback: stance_width={stance_width_in:.1f}\", target_normalized≈0.16 for pros") return sway_inches def _subframe_impact_refinement(world_landmarks: Dict[int, List], impact_candidate: int) -> float: """ Refine impact timing to sub-frame precision using parabolic fit to hand speed Fits parabola around the impact candidate and returns fractional frame timestamp """ try: # Get frames around impact candidate for parabolic fit test_frames = [f for f in range(impact_candidate-2, impact_candidate+3) if f in world_landmarks] if len(test_frames) < 3: return float(impact_candidate) # Not enough data for refinement # Calculate hand speeds around impact speeds = [] frame_indices = [] for i in range(1, len(test_frames)): f_curr = test_frames[i] f_prev = test_frames[i-1] curr_frame = world_landmarks[f_curr] prev_frame = world_landmarks[f_prev] if (curr_frame and len(curr_frame) > 16 and prev_frame and len(prev_frame) > 16 and curr_frame[16] and prev_frame[16]): curr_wrist = curr_frame[16] prev_wrist = prev_frame[16] # Calculate 3D speed dx = curr_wrist[0] - prev_wrist[0] dy = curr_wrist[1] - prev_wrist[1] dz = curr_wrist[2] - prev_wrist[2] speed = math.sqrt(dx*dx + dy*dy + dz*dz) speeds.append(speed) frame_indices.append(f_curr) if len(speeds) < 3: return float(impact_candidate) # Fit parabola to speed curve: speed = a*t^2 + b*t + c # Find peak (maximum) of parabola t = np.array(frame_indices, dtype=float) s = np.array(speeds) # Polynomial fit (degree 2) coeffs = np.polyfit(t, s, 2) a, b, c = coeffs if a >= 0: # Parabola opens upward → no maximum return float(impact_candidate) # Maximum at t = -b/(2a) peak_frame_fractional = -b / (2*a) # Impact happens just before peak (typically 0.5-1.0 frames before) # Use the original logic of peak-1, but with fractional precision impact_fractional = peak_frame_fractional - 0.7 # Refined offset # Constrain to reasonable range around original estimate if abs(impact_fractional - impact_candidate) > 2.0: impact_fractional = float(impact_candidate) dbg(f"subframe_refinement - parabola_coeffs=[{a:.6f}, {b:.6f}, {c:.6f}]") dbg(f"subframe_refinement - peak_frame={peak_frame_fractional:.2f}, impact_refined={impact_fractional:.2f}") return impact_fractional except Exception as e: dbg(f"subframe_refinement - failed: {e}, using integer frame") return float(impact_candidate) def _address_frames(swing_phases: Dict[str, List]) -> List[int]: """Get address/setup frames""" return swing_phases.get("setup", [])[:10] def _impact_frame(world_landmarks: Dict[int, List], swing_phases: Dict[str, List]) -> Optional[int]: """Find impact using peak lead-hand speed - 1 frame with inline refinement""" downswing_frames = swing_phases.get("downswing", []) impact_frames = swing_phases.get("impact", []) analysis_frames = sorted(downswing_frames + impact_frames) if len(analysis_frames) < 5: return impact_frames[0] if impact_frames else None # Calculate hand speeds hand_speeds = [] valid_frames = [] for i in range(1, len(analysis_frames)): f_curr = analysis_frames[i] f_prev = analysis_frames[i-1] if f_curr in world_landmarks and f_prev in world_landmarks: curr_frame = world_landmarks[f_curr] prev_frame = world_landmarks[f_prev] if (curr_frame and len(curr_frame) > 16 and prev_frame and len(prev_frame) > 16): # Use lead hand (left wrist for right-handed golfer) wrist_idx = 15 # Lead wrist for right-handed curr_wrist = curr_frame[wrist_idx] prev_wrist = prev_frame[wrist_idx] if (curr_wrist and len(curr_wrist) >= 3 and prev_wrist and len(prev_wrist) >= 3): dx = curr_wrist[0] - prev_wrist[0] dy = curr_wrist[1] - prev_wrist[1] dz = curr_wrist[2] - prev_wrist[2] speed = math.sqrt(dx*dx + dy*dy + dz*dz) hand_speeds.append(speed) valid_frames.append(f_curr) if len(hand_speeds) >= 3: # Find peak speed frame max_speed_idx = np.argmax(hand_speeds) peak_speed_frame = valid_frames[max_speed_idx] # Initial estimate: peak speed - 1 frame impact_candidate = peak_speed_frame if max_speed_idx > 0: impact_candidate = valid_frames[max_speed_idx - 1] # Clamp to ±2 frames around original estimate original = valid_frames[max_speed_idx - 1] if max_speed_idx > 0 else peak_speed_frame impact_candidate = max(original - 2, min(original + 2, impact_candidate)) return impact_candidate return impact_frames[0] if impact_frames else None # Removed complex 2D fallback function - using simple direct measurement only # ================== HIP-SHOULDER SEPARATION METRIC ================== def _angle_deg(p_left: np.ndarray, p_right: np.ndarray) -> float: """ Angle of the line Left->Right relative to screen-horizontal, in degrees. Image coords: x right+, y down+. """ dx = float(p_right[0] - p_left[0]) dy = float(p_right[1] - p_left[1]) return np.degrees(np.arctan2(dy, dx)) def _wrap_deg(a: float) -> float: """Wrap to (-180, 180].""" a = (a + 180.0) % 360.0 - 180.0 return a def _span(p_left: np.ndarray, p_right: np.ndarray) -> float: """Euclidean span in pixels.""" return float(np.hypot(p_right[0] - p_left[0], p_right[1] - p_left[1])) def _trimmed_median(values: List[float], trim: float = 0.2) -> float: """Robust center estimate.""" if not values: return 0.0 arr = np.sort(np.asarray(values, dtype=float)) n = len(arr) k = int(np.floor(trim * n)) arr = arr[k: n - k] if n - 2*k > 0 else arr return float(np.median(arr)) def _circular_mean_deg(values: List[float]) -> float: """Circular mean for angle values to avoid wrap artifacts.""" if not values: return 0.0 a = np.radians(values) return float(np.degrees(np.arctan2(np.mean(np.sin(a)), np.mean(np.cos(a))))) def estimate_global_roll_deg( lm_seq: np.ndarray, setup_idxs: List[int], use_eyes: bool = True ) -> float: """ Estimate camera roll from early setup frames using eyes + shoulders only. Uses circular mean to avoid wrap artifacts. Excludes hips to prevent bias from pelvis rotation. Returns a single roll angle in degrees (to subtract from later line angles). """ eye_angles, sho_angles = [], [] for t in setup_idxs: if t >= lm_seq.shape[0]: continue lm = lm_seq[t] try: if use_eyes and len(lm) > max(L_EYE, R_EYE): eye_angles.append(_angle_deg(lm[L_EYE], lm[R_EYE])) if len(lm) > max(L_SHO, R_SHO): sho_angles.append(_angle_deg(lm[L_SHO], lm[R_SHO])) except Exception: continue # Combine all angles for circular mean (exclude hips to avoid pelvis bias) all_angles = [] if eye_angles: all_angles.extend(eye_angles) if sho_angles: all_angles.extend(sho_angles) if not all_angles: return 0.0 # Use circular mean to avoid wrap artifacts like 171° instead of -18° return _circular_mean_deg(all_angles) def hip_shoulder_separation_deg( lm_seq: np.ndarray, idx: int, setup_idxs: List[int], handed: str = "R", min_span_px: float = 20.0, smooth_half_window: int = 2, # 5-frame median around idx apply_yaw_correction: bool = True ) -> Dict[str, float]: """ Compute signed hip-shoulder separation at a given frame. Positive (right-handed) => hips more open than shoulders (hips leading). For left-handed, sign is flipped so positive still means hips leading. New features: - Temporal smoothing: Uses median over ±smooth_half_window frames around idx - Yaw compensation: Corrects for camera angle by comparing shoulder spans - Geometry confidence: Flags when foreshortening may affect accuracy Returns: dict with: 'hip_shoulder_sep_deg': Corrected separation angle 'hip_shoulder_sep_abs_deg': Absolute value of separation 'confidence': 0-1 quality of landmark detection 'span_ratio': Shoulder span at idx vs address (for foreshortening check) 'geometry_ok': True if camera appears square to player """ T = lm_seq.shape[0] # Neighborhood for temporal median smoothing lo = max(0, idx - smooth_half_window) hi = min(T - 1, idx + smooth_half_window) window = range(lo, hi + 1) # Estimate roll from setup roll_deg = estimate_global_roll_deg(lm_seq, setup_idxs, use_eyes=True) hip_angles, sho_angles, confs = [], [], [] for t in window: if t >= T: continue lm = lm_seq[t] if len(lm) <= max(L_SHO, R_SHO, L_HIP, R_HIP): continue Ls, Rs = lm[L_SHO], lm[R_SHO] Lh, Rh = lm[L_HIP], lm[R_HIP] # Confidence via spans span_sho = _span(Ls, Rs) span_hip = _span(Lh, Rh) good = float((span_sho >= min_span_px) and (span_hip >= min_span_px)) confs.append(good) # Raw angles a_sho = _angle_deg(Ls, Rs) - roll_deg a_hip = _angle_deg(Lh, Rh) - roll_deg # Wrap both post de-roll a_sho = _wrap_deg(a_sho) a_hip = _wrap_deg(a_hip) sho_angles.append(a_sho) hip_angles.append(a_hip) if not sho_angles or not hip_angles: return dict(hip_shoulder_sep_deg=0.0, hip_shoulder_sep_abs_deg=0.0, confidence=0.0) # Temporal median to reduce jitter a_sho_med = float(np.median(sho_angles)) a_hip_med = float(np.median(hip_angles)) conf = float(np.mean(confs)) # 0..1 proportion of frames with decent span # Separation: hips minus shoulders sep = _wrap_deg(a_hip_med - a_sho_med) # DEBUG PRINT: Add debugging info for the angles and separation print(f"[DBG] roll={roll_deg:.1f}°, a_hip_med={a_hip_med:.1f}°, a_sho_med={a_sho_med:.1f}°, sep={sep:.1f}°") # FORESHORTENING CHECK: Compare shoulder span from address to impact span_addr = None span_ratio = 1.0 geom_ok = True severe_foreshortening = False if apply_yaw_correction and setup_idxs: try: # Calculate median shoulder and hip spans during address addr_sho_spans, addr_hip_spans = [], [] for t in setup_idxs: if t < lm_seq.shape[0]: lm = lm_seq[t] if len(lm) > max(L_SHO, R_SHO): addr_sho_spans.append(_span(lm[L_SHO], lm[R_SHO])) if len(lm) > max(L_HIP, R_HIP): addr_hip_spans.append(_span(lm[L_HIP], lm[R_HIP])) if addr_sho_spans: span_addr_sho = np.median(addr_sho_spans) span_addr_hip = np.median(addr_hip_spans) if addr_hip_spans else span_addr_sho # Current frame spans lm_current = lm_seq[idx] if idx < lm_seq.shape[0] else None if lm_current is not None and len(lm_current) > max(L_SHO, R_SHO): span_imp_sho = _span(lm_current[L_SHO], lm_current[R_SHO]) span_imp_hip = _span(lm_current[L_HIP], lm_current[R_HIP]) if len(lm_current) > max(L_HIP, R_HIP) else span_imp_sho # Calculate ratios and use the better (less compressed) one ratio_sho = span_imp_sho / (span_addr_sho + 1e-6) ratio_hip = span_imp_hip / (span_addr_hip + 1e-6) span_ratio = max(ratio_sho, ratio_hip) # Use less-compressed ratio print(f"[DBG] shoulder_span addr={span_addr_sho:.1f}px, impact={span_imp_sho:.1f}px, ratio_sho={ratio_sho:.2f}") print(f"[DBG] hip_span addr={span_addr_hip:.1f}px, impact={span_imp_hip:.1f}px, ratio_hip={ratio_hip:.2f}") print(f"[DBG] using_ratio={span_ratio:.2f} (better of sho/hip)") # Apply clamping to realistic range (0.7-1.0) to avoid underestimation ratio_clamped = min(1.0, max(0.7, span_ratio)) sep_raw = sep sep = sep / ratio_clamped print(f"[DBG] sep_raw={sep_raw:.1f}°, sep_corr={sep:.1f}° (ratio_clamped={ratio_clamped:.2f})") # Geometry confidence flag - flag low-confidence results when span ratio <0.5 geom_ok = span_ratio >= 0.7 # reasonable geometry severe_foreshortening = span_ratio < 0.5 # flag low-confidence results except Exception as e: print(f"[DBG] Foreshortening calculation failed: {e}") # Handedness normalization: keep "positive = hips leading" for both if handed.upper().startswith("L"): sep *= -1.0 # Adjust confidence based on foreshortening severity final_conf = conf if severe_foreshortening: final_conf *= 0.2 # Very low confidence for severe foreshortening (span ratio <0.5) elif not geom_ok: final_conf *= 0.6 # Reduced confidence for geometry issues (span ratio <0.7) return dict( hip_shoulder_sep_deg=float(sep), hip_shoulder_sep_abs_deg=float(abs(sep)), confidence=float(final_conf), span_ratio=float(span_ratio), geometry_ok=bool(geom_ok), severe_foreshortening=bool(severe_foreshortening) ) def metric_at_address_top_impact( lm_seq: np.ndarray, address_idxs: List[int], top_idx: Optional[int], impact_idx: int, handed: str = "R" ) -> Dict[str, Dict[str, float]]: """ Compute the metric at address (median over address_idxs), top (optional), and impact. address_idxs: e.g., first 8–12 frames while static at setup. """ out = {} # Address: use address window both for setup roll and target idx smoothing addr_idx = int(np.median(address_idxs)) out["address"] = hip_shoulder_separation_deg( lm_seq, addr_idx, setup_idxs=address_idxs, handed=handed ) if top_idx is not None: out["top"] = hip_shoulder_separation_deg( lm_seq, top_idx, setup_idxs=address_idxs, handed=handed ) out["impact"] = hip_shoulder_separation_deg( lm_seq, impact_idx, setup_idxs=address_idxs, handed=handed ) return out def calculate_hip_shoulder_separation_at_impact( pose_data: Dict[int, List], swing_phases: Dict[str, List], handedness: str = 'right' ) -> Dict[str, float]: """ Calculate hip-shoulder separation at impact using the new metric. Args: pose_data: Dictionary mapping frame indices to pose keypoints swing_phases: Dictionary mapping phase names to lists of frame indices handedness: 'right' or 'left' handed player Returns: Dict with hip_shoulder_sep_deg, hip_shoulder_sep_abs_deg, confidence """ # Convert pose_data to numpy array format expected by the metric if not pose_data: return {"hip_shoulder_sep_deg": None, "hip_shoulder_sep_abs_deg": None, "confidence": 0.0} # Get frame indices frame_indices = sorted(pose_data.keys()) max_frame_idx = max(frame_indices) # Create numpy array with shape [T, N, 2] where N >= 33 (Mediapipe format) lm_seq = np.zeros((max_frame_idx + 1, 33, 2), dtype=float) # Fill the array with pose data for frame_idx, landmarks in pose_data.items(): if landmarks and len(landmarks) >= 25: # Need at least hip landmarks for i, lm in enumerate(landmarks): if i < 33 and lm and len(lm) >= 2: lm_seq[frame_idx, i, 0] = lm[0] # x lm_seq[frame_idx, i, 1] = lm[1] # y # Get swing phase indices setup_frames = swing_phases.get("setup", []) impact_frames = swing_phases.get("impact", []) if not setup_frames or not impact_frames: dbg("[HIP-SHO] Missing setup or impact frames") return {"hip_shoulder_sep_deg": None, "hip_shoulder_sep_abs_deg": None, "confidence": 0.0} # Use first 8-10 setup frames for address address_idxs = setup_frames[:min(10, len(setup_frames))] impact_idx = impact_frames[0] # Convert handedness format handed = "R" if handedness.lower().startswith('r') else "L" try: # Calculate the metric at impact (5-frame median for temporal smoothing) result = hip_shoulder_separation_deg( lm_seq=lm_seq, idx=impact_idx, setup_idxs=address_idxs, handed=handed, smooth_half_window=2 # 5-frame median around impact (±2 frames) ) # Enhanced debug output with geometry information geom_warning = "" if result.get('severe_foreshortening', False): geom_warning = " (severely underest. - severe foreshortening)" elif not result.get('geometry_ok', True): geom_warning = " (underest. likely; yaw/foreshortening)" dbg(f"[HIP-SHO] Impact separation: {result['hip_shoulder_sep_deg']:.1f}°, confidence: {result['confidence']:.2f}, span_ratio: {result.get('span_ratio', 1.0):.2f}{geom_warning}") return result except Exception as e: dbg(f"[HIP-SHO] Calculation failed: {e}") return {"hip_shoulder_sep_deg": None, "hip_shoulder_sep_abs_deg": None, "confidence": 0.0} # ================== END HIP-SHOULDER SEPARATION METRIC ================== def calculate_hip_sway_at_top(pose_data: Dict[int, List], swing_phases: Dict[str, List], world_landmarks: Optional[Dict[int, List]] = None) -> Union[float, None]: # Apply additional temporal smoothing (Savitzky-Golay or EWMA) before measurement sm = smooth_landmarks(pose_data, alpha=0.25) # Stronger smoothing for hip sway setup_frames = _pick_stable_setup_frames(sm, swing_phases, max_frames=6) if not setup_frames: return None backswing_frames = swing_phases.get("backswing", []) if not backswing_frames: return None top_frame = _pick_top_by_wrist_height(sm, backswing_frames) or backswing_frames[-1] H, ok, H_frame = _build_foot_only_homography(sm, setup_frames) if not ok or H is None: # simple pixel fallback with world toes scale if present return _simple_pixel_sway_fallback(sm, setup_frames, top_frame, swing_phases, world_landmarks) import cv2 def warp2(pts): A = np.array(pts, dtype=np.float32).reshape(1, -1, 2) return cv2.perspectiveTransform(A, H)[0] # forward axis = perp to toe line in ground plane toe_vecs = [] for f in setup_frames: lm = sm.get(f) if not lm or not lm[31] or not lm[32]: continue g = warp2([[lm[31][0], lm[31][1]], [lm[32][0], lm[32][1]]]) v = g[1] - g[0] n = np.linalg.norm(v) if n > 1e-6: toe_vecs.append(v/n) if not toe_vecs: return None toe_hat = np.median(np.stack(toe_vecs), axis=0); toe_hat /= (np.linalg.norm(toe_hat)+1e-9) fwd_hat = np.array([-toe_hat[1], toe_hat[0]]) fwd_hat /= (np.linalg.norm(fwd_hat)+1e-9) # sign disambiguation with addr→impact pelvis motion (optional but stable) imp = swing_phases.get("impact", []) cand = imp[0] if imp else backswing_frames[-1] def pelvis_ctr_g(f): lm = sm.get(f); if not lm or not lm[23] or not lm[24]: return None g = warp2([[lm[23][0], lm[23][1]], [lm[24][0], lm[24][1]]]); return (g[0]+g[1])/2 addr_ctrs = [pelvis_ctr_g(f) for f in setup_frames]; addr_ctrs = [c for c in addr_ctrs if c is not None] imp_ctr = pelvis_ctr_g(cand) if addr_ctrs and imp_ctr is not None: addr_ctr = np.median(np.stack(addr_ctrs), axis=0) if float(np.dot(imp_ctr-addr_ctr, fwd_hat)) < 0: dbg("[SWAY] Flipped fwd_hat based on addr→impact pelvis motion") fwd_hat = -fwd_hat # Establish robust address baseline to stop drift from inflating jitter setup_ctr = np.median(np.stack(addr_ctrs), axis=0) # Robust baseline with detrending: use tight window around address addr_ctrs_array = np.stack(addr_ctrs) addr_x_positions = addr_ctrs_array[:, 0] # X coordinates only # Light detrend to remove slow drift t = np.arange(len(addr_x_positions)) if len(addr_x_positions) >= 2: m, b = np.polyfit(t, addr_x_positions, 1) # Linear fit residuals = addr_x_positions - (m * t + b) # Robust jitter using MAD (Median Absolute Deviation) mad_residuals = np.median(np.abs(residuals - np.median(residuals))) setup_jitter_px = 1.4826 * mad_residuals # MAD to std conversion addr_baseline_px = b # Robust address baseline else: setup_jitter_px = 0.0 addr_baseline_px = addr_x_positions[0] if len(addr_x_positions) > 0 else setup_ctr[0] dbg(f"[SWAY] Robust baseline: jitter_px={setup_jitter_px:.2f} (was std-based), addr_baseline={addr_baseline_px:.1f}") setup_jitter = setup_jitter_px # Use robust jitter top_ctr = pelvis_ctr_g(top_frame) if top_ctr is None: return None # Apply jitter-aware correction - reduce small movements that may be noise raw_delta = top_ctr - setup_ctr if np.linalg.norm(raw_delta) < setup_jitter * 1.5: dbg(f"[SWAY] Movement ({np.linalg.norm(raw_delta):.2f}) < 1.5x setup jitter ({setup_jitter:.2f}), applying correction") # Partial correction for movements smaller than jitter threshold correction_factor = max(0.0, (np.linalg.norm(raw_delta) - setup_jitter) / np.linalg.norm(raw_delta)) if np.linalg.norm(raw_delta) > 0 else 0.0 corrected_delta = raw_delta * correction_factor top_ctr = setup_ctr + corrected_delta # Validate vertical drift but DO NOT repick top to minimize it TOP_DRIFT_GU_WARN = 3.0 # warn if pelvis vertical drift > 3 gu, but DO NOT change top TOP_DRIFT_GU_REJECT = 6.0 # only reject if absurd vertical_drift = abs(top_ctr[1] - setup_ctr[1]) # Ground-plane Y difference if vertical_drift > TOP_DRIFT_GU_REJECT: dbg(f"[SWAY] vertical drift {vertical_drift:.1f} gu -> unreliable sway") return None elif vertical_drift > TOP_DRIFT_GU_WARN: dbg(f"[SWAY] vertical drift {vertical_drift:.1f} gu (warn)") # DO NOT repick top based on vertical drift - top must be picked by club kinematics # Ground-plane calibration: gu_per_inch stabilization with consistency check gu_per_inch_list = [] use_pixel_fallback = False for k in setup_frames: if world_landmarks and k in world_landmarks: try: # 1) World toe distance (meters -> inches) Lt_w, Rt_w = world_landmarks[k][31], world_landmarks[k][32] toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3])) toe_world_in = float(toe_world_m * 39.3701) # 2) Ground-plane toe distance (same frame, via homography) lmK = sm.get(k) if lmK and lmK[31] and lmK[32]: toe_g = warp2([ [lmK[31][0], lmK[31][1]], [lmK[32][0], lmK[32][1]], ]) # shape (2,2) toe_g_dist = float(np.linalg.norm(toe_g[0] - toe_g[1])) # ground units if toe_g_dist > 1e-3 and toe_world_in > 1e-3: gu_per_in = toe_g_dist / max(toe_world_in, 1e-6) # ground-units per inch # Homography plausibility check: stance width should be ~8–30 inches if 8.0 <= toe_world_in <= 30.0 and 0.1 <= gu_per_in <= 10.0: gu_per_inch_list.append(gu_per_in) dbg(f"[SWAY] frame {k}: toe_g={toe_g_dist:.2f} gu, toe_world={toe_world_in:.2f} in, gu_per_in={gu_per_in:.3f}") else: dbg(f"[SWAY] implausible frame {k}: toe_world_in={toe_world_in:.2f}, gu_per_in={gu_per_in:.3f}") except Exception as e: dbg(f"[SWAY] calibration failed for frame {k}: {e}") # Reject frames with inconsistent calibration (>10% deviation from median) if len(gu_per_inch_list) > 1: initial_median = np.median(gu_per_inch_list) consistent_values = [] for val in gu_per_inch_list: deviation_pct = abs(val - initial_median) / initial_median * 100 if deviation_pct <= 10.0: # Within 10% of median consistent_values.append(val) else: dbg(f"[SWAY] Rejecting inconsistent gu_per_in={val:.3f} (deviation={deviation_pct:.1f}%)") if consistent_values: gu_per_inch_list = consistent_values dbg(f"[SWAY] Kept {len(consistent_values)}/{len(gu_per_inch_list)} consistent calibration values") if gu_per_inch_list: gu_per_in = np.median(gu_per_inch_list) gu_per_in = float(np.clip(gu_per_in, 0.1, 10.0)) # ground-plane appropriate range use_pixel_fallback = False dbg(f"[SWAY] using ground calibration: median gu_per_in={gu_per_in:.3f} from {len(gu_per_inch_list)} frames") else: dbg("[SWAY] no consistent ground calibration; will try pixel fallback") use_pixel_fallback = True gu_per_in = 1.0 # Option B pixel fallback with consistency check if use_pixel_fallback and world_landmarks and setup_frames: px_per_inch_list = [] for k in setup_frames: if k in world_landmarks: try: Lt_w, Rt_w = world_landmarks[k][31], world_landmarks[k][32] toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3])) toe_world_in = float(toe_world_m * 39.3701) lmK = sm.get(k) if lmK and lmK[31] and lmK[32]: # Image pixel distance toe_px = float(np.linalg.norm(np.array(lmK[31][:2]) - np.array(lmK[32][:2]))) if toe_px > 1e-3 and toe_world_in > 1e-3: px_per_in = toe_px / max(toe_world_in, 1e-6) if 3.0 <= px_per_in <= 60.0: # image pixels appropriate range px_per_inch_list.append(px_per_in) dbg(f"[SWAY] FALLBACK frame {k}: toe_px={toe_px:.1f}, toe_world_in={toe_world_in:.2f}, px_per_in={px_per_in:.2f}") except: pass # Apply same consistency check to pixel fallback if len(px_per_inch_list) > 1: initial_median = np.median(px_per_inch_list) consistent_px_values = [] for val in px_per_inch_list: deviation_pct = abs(val - initial_median) / initial_median * 100 if deviation_pct <= 10.0: # Within 10% of median consistent_px_values.append(val) else: dbg(f"[SWAY] Rejecting inconsistent px_per_in={val:.2f} (deviation={deviation_pct:.1f}%)") px_per_inch_list = consistent_px_values if px_per_inch_list: gu_per_in = np.median(px_per_inch_list) # reuse variable name for simplicity dbg(f"[SWAY] using pixel fallback: median px_per_in={gu_per_in:.2f} from {len(px_per_inch_list)} frames") else: gu_per_in = 1.0 dbg("[SWAY] all calibration failed, using unit scale") inch_scale = 1.0 / gu_per_in if gu_per_in > 0 else 1.0 # Measure ∆x in ground units end-to-end for consistency # Convert pelvis positions to ground units setup_ctr_gu = np.array([addr_baseline_px, setup_ctr[1]]) # Use robust baseline top_ctr_gu = top_ctr # Two-stage denoising: apply rolling median + EWMA smoothing to top position # (Note: setup already detrended above) if len(addr_ctrs) >= 3: # Simple moving median for top position stability (if multiple candidates available) top_candidates = [] for f_nearby in range(max(setup_frames[0], top_frame-2), min(setup_frames[-1], top_frame+3)): if f_nearby in range(len(pose_data)) and f_nearby != top_frame: nearby_ctr = pelvis_ctr_g(f_nearby) if nearby_ctr is not None: top_candidates.append(nearby_ctr) if top_candidates and len(top_candidates) >= 2: top_positions = np.stack([top_ctr] + top_candidates) # Median filter followed by light smoothing top_x_values = top_positions[:, 0] top_x_median = np.median(top_x_values) # EWMA with conservative alpha on the median-filtered result alpha = 0.2 top_ctr_gu = np.array([top_x_median, top_ctr[1]]) dbg(f"[SWAY] Two-stage denoising: raw_top_x={top_ctr[0]:.2f}, median_filtered={top_x_median:.2f}") # Calculate sway in ground units, then convert to inches if top_ctr_gu is not None and setup_ctr_gu is not None and fwd_hat is not None: sway_gu = float(np.dot(top_ctr_gu - setup_ctr_gu, fwd_hat)) hip_sway_in = sway_gu * inch_scale else: dbg("[SWAY] Missing required data for sway calculation") return None # Convert jitter to inches using same scale setup_jitter_in = setup_jitter * inch_scale if setup_jitter is not None else 0.0 # Lightweight yaw/foreshortening correction using hip span ratio ratio_hip = 1.0 # Default if no correction available if world_landmarks and setup_frames: try: # Calculate hip span ratio from setup to top for yaw correction setup_hip_spans = [] for f in setup_frames[:3]: # Use first few setup frames if f in world_landmarks: wf = world_landmarks[f] if len(wf) > 24 and wf[23] and wf[24]: # L_HIP, R_HIP span = np.linalg.norm(np.array(wf[24][:3]) - np.array(wf[23][:3])) if 0.15 <= span <= 0.6: # Reasonable hip span in meters setup_hip_spans.append(span) if setup_hip_spans and top_frame in world_landmarks: wf_top = world_landmarks[top_frame] if len(wf_top) > 24 and wf_top[23] and wf_top[24]: top_hip_span = np.linalg.norm(np.array(wf_top[24][:3]) - np.array(wf_top[23][:3])) setup_hip_span = np.median(setup_hip_spans) ratio_hip = top_hip_span / setup_hip_span ratio_hip = np.clip(ratio_hip, 0.80, 1.00) # Conservative clamp dbg(f"[SWAY] Hip span ratio: {ratio_hip:.3f} (setup={setup_hip_span:.3f}m, top={top_hip_span:.3f}m)") except Exception as e: dbg(f"[SWAY] Hip span ratio calculation failed: {e}") # Apply yaw correction hip_sway_in_corrected = hip_sway_in / ratio_hip dbg(f"[SWAY] Yaw correction: raw={hip_sway_in:.2f}\", corrected={hip_sway_in_corrected:.2f}\" (ratio={ratio_hip:.3f})") # Use corrected value hip_sway_in = hip_sway_in_corrected # STANCE WIDTH NORMALIZATION: Calculate stance width for normalized sway stance_width_in = None if gu_per_inch_list and world_landmarks: # Use the same calibration frames to get stance width stance_widths = [] for k in setup_frames: if k in world_landmarks: try: Lt_w, Rt_w = world_landmarks[k][31], world_landmarks[k][32] toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3])) stance_widths.append(float(toe_world_m * 39.3701)) # Convert to inches except: pass if stance_widths: stance_width_in = np.median(stance_widths) # Normalize and log (don't change return type) normalized_sway = None if stance_width_in and 14 <= stance_width_in <= 28: # Reasonable stance width range normalized_sway = hip_sway_in / stance_width_in # Calculate trust ratio with robust jitter sway_jitter_ratio = abs(hip_sway_in) / max(setup_jitter_in, 1e-6) trust_flag = "trusted" if sway_jitter_ratio >= 1.5 else "low-trust" # Enhanced debug output with all the key metrics norm_str = f"{normalized_sway:.3f}" if normalized_sway is not None else "NA" dbg(f"[SWAY] dx={hip_sway_in:.2f}in, norm={norm_str}, jitter={setup_jitter_in:.2f}in, ratio={sway_jitter_ratio:.1f}") # Safe formatting for sway_gu which might be None in some edge cases sway_gu_str = f"{sway_gu:.2f}" if sway_gu is not None else "None" gu_per_in_str = f"{gu_per_in:.3f}" if gu_per_in is not None else "None" dbg(f"[SWAY] addr→top pelvis Δx={sway_gu_str}gu @ gu_per_in={gu_per_in_str}, trust={trust_flag}") if stance_width_in: dbg(f"[SWAY] stance_width={stance_width_in:.1f}\", target_normalized≈0.16 for pros") # Trust but verify debug printout hip_sway_str = f"{hip_sway_in:.1f}" if hip_sway_in is not None else "None" gu_per_in_final_str = f"{gu_per_in:.3f}" if gu_per_in is not None else "None" dbg(f"[SWAY] final hip_sway_in={hip_sway_str}\" with gu_per_in={gu_per_in_final_str}") # Return signed, un-clipped value to preserve direction; clip only for UI bands if desired return float(hip_sway_in) def _shaft_axis_hat(lm, handedness='right'): """Shaft axis proxy: line through wrists (trail->lead). Deterministic and stable.""" if not lm: return None L_WR, R_WR = 15, 16 if not lm[L_WR] or not lm[R_WR] or lm[L_WR][2] < 0.3 or lm[R_WR][2] < 0.3: return None # Build trail→lead vector based on handedness if handedness == 'right': # Right-handed: trail=16 (right wrist), lead=15 (left wrist) trail_wrist, lead_wrist = R_WR, L_WR else: # Left-handed: trail=15 (left wrist), lead=16 (right wrist) trail_wrist, lead_wrist = L_WR, R_WR v = np.array([lm[lead_wrist][0] - lm[trail_wrist][0], lm[lead_wrist][1] - lm[trail_wrist][1]], float) # trail->lead n = np.linalg.norm(v) return v / n if n > 1e-6 else None def _hinge2d_shaft(lm, handed='right', roll_deg=0.0): """Lead-forearm only wrist hinge = angle between lead forearm and shaft vectors. Softened gating and consistent lead-only measurement.""" if not lm: return None # Lead forearm only - consistent landmark selection EL, WR = (13, 15) if handed=='right' else (14, 16) # lead elbow/wrist if not lm[EL] or not lm[WR] or lm[EL][2] < 0.4 or lm[WR][2] < 0.4: # Relaxed visibility threshold return None # Lead forearm vector from elbow to wrist forearm_vec = np.array([lm[WR][0] - lm[EL][0], lm[WR][1] - lm[EL][1]], float) # Shaft vector using both wrists for more stable direction shaft_vec = _shaft_axis_hat(lm, handed) if shaft_vec is None: # Fallback: use wrist-to-wrist vector as shaft proxy if lm[15] and lm[16] and lm[15][2] >= 0.4 and lm[16][2] >= 0.4: # Build trail→lead vector based on handedness for consistency if handed == 'right': trail_wrist, lead_wrist = 16, 15 else: trail_wrist, lead_wrist = 15, 16 shaft_vec = np.array([lm[lead_wrist][0] - lm[trail_wrist][0], lm[lead_wrist][1] - lm[trail_wrist][1]], float) else: return None # De-roll both vectors using the provided roll correction cr, sr = math.cos(math.radians(-roll_deg)), math.sin(math.radians(-roll_deg)) def deroll(v): return np.array([cr*v[0] - sr*v[1], sr*v[0] + cr*v[1]], float) forearm_vec = deroll(forearm_vec) shaft_vec = deroll(shaft_vec) # Normalize vectors forearm_norm = np.linalg.norm(forearm_vec) shaft_norm = np.linalg.norm(shaft_vec) if forearm_norm < 1e-6 or shaft_norm < 1e-6: return None # Calculate angle between lead forearm and shaft cos_angle = float(np.clip(np.dot(forearm_vec, shaft_vec) / (forearm_norm * shaft_norm), -1.0, 1.0)) angle_between = math.degrees(math.acos(abs(cos_angle))) # Always positive angle # Report acute angle (angle between forearm and shaft) instead of obtuse # This puts results in the expected pro range (~50-70° acute) hinge_acute = angle_between hinge_obtuse = 180.0 - angle_between # Traditional radial deviation for debugging # Debug logging: keep both for debugging but standardize to acute dbg(f"[HINGE] acute={hinge_acute:.1f}°, obtuse={hinge_obtuse:.1f}° (reporting acute)") return float(hinge_acute) def _pelvis_ctr_ground_series(sm, frames, warp2): """Get pelvis centers in ground plane for given frames""" ctrs = [] for f in frames: lm = sm.get(f) if not lm or not lm[23] or not lm[24]: ctrs.append(None) continue g = warp2([[lm[23][0], lm[23][1]], [lm[24][0], lm[24][1]]]) ctrs.append(((g[0]+g[1])/2).astype(float)) return ctrs def _refine_top_by_lat_vel(sm, backswing_frames, warp2, target_hat_g, init_top): """Refine top frame using lateral pelvis velocity zero-crossing""" # compute pelvis centers along backswing and project onto target axis ctrs = _pelvis_ctr_ground_series(sm, backswing_frames, warp2) ts = [i for i,c in enumerate(ctrs) if c is not None] if len(ts) < 5: return init_top # not enough samples proj = np.array([float(np.dot(ctrs[i], target_hat_g)) if ctrs[i] is not None else np.nan for i in range(len(ctrs))]) # smooth (5-pt) k = 5 proj_s = np.convolve(np.nan_to_num(proj), np.ones(k)/k, mode='same') # finite diff vel = np.gradient(proj_s) # take frame near init_top where vel≈0 and |vel| is locally minimal (change of direction) idx0 = backswing_frames.index(init_top) if init_top in backswing_frames else len(backswing_frames)-1 win = range(max(0, idx0-4), min(len(vel), idx0+5)) # find zero-cross or min |vel| cand = min(win, key=lambda i: abs(vel[i])) return backswing_frames[cand] def _point_ground(frame_idx, sm, warp2, i): """Get ground-plane position of landmark i in frame frame_idx""" lm = sm.get(frame_idx) if not lm or not lm[i]: return None return warp2([[lm[i][0], lm[i][1]]])[0].astype(float) def _pick_p3_lead_arm_parallel(sm, backswing_frames, handed='right', tol_deg=15.0): """Pick the backswing frame where the lead shoulder→lead wrist vector is most horizontal.""" L_SHO, WR = (11, 15) if handed=='right' else (12, 16) best = None best_abs = 1e9 for f in backswing_frames: lm = sm.get(f) if not lm or not lm[L_SHO] or not lm[WR]: continue if lm[L_SHO][2] < 0.5 or lm[WR][2] < 0.5: continue dx = lm[WR][0] - lm[L_SHO][0] dy = lm[WR][1] - lm[L_SHO][1] if abs(dx) + abs(dy) < 1e-6: continue ang = abs(math.degrees(math.atan2(dy, dx))) # 0° is perfectly horizontal if ang < best_abs: best_abs, best = ang, f return best def _pick_top_by_wrist_height(sm, backswing, handed='right', min_vis=0.7): """Pick top frame by highest wrist position with improved validation""" WR = 15 if handed=='right' else 16 SH = 11 if handed=='right' else 12 # lead shoulder best, best_y = None, 1e9 candidates = [] consecutive_rejects = 0 max_consecutive_rejects = 10 # Stop after N consecutive rejects to reduce spam for f in backswing: lm = sm.get(f) if not lm or not lm[WR] or not lm[SH] or lm[WR][2] < min_vis or lm[SH][2] < min_vis: consecutive_rejects += 1 if consecutive_rejects >= max_consecutive_rejects: dbg(f"[WRIST] Skipping {max_consecutive_rejects}+ consecutive bad frames, jumping ahead") break continue # Reject frames with shoulder-to-wrist line near vertical (club laid off/occluded) # Calculate angle of shoulder-to-wrist line from horizontal dx = lm[WR][0] - lm[SH][0] dy = lm[WR][1] - lm[SH][1] if abs(dx) < 1e-6: # Perfectly vertical consecutive_rejects += 1 if consecutive_rejects >= max_consecutive_rejects: dbg(f"[WRIST] Skipping {max_consecutive_rejects}+ consecutive steep frames, jumping ahead") break continue angle_from_horizontal = abs(math.degrees(math.atan2(dy, dx))) # Unified gating constant - use everywhere that gates by shoulder-wrist angle SHO_WRIST_STEEP_MAX = 120.0 # was ~105 in some functions if angle_from_horizontal > SHO_WRIST_STEEP_MAX: consecutive_rejects += 1 if consecutive_rejects >= max_consecutive_rejects: dbg(f"[WRIST] Skipping {max_consecutive_rejects}+ consecutive steep frames, jumping ahead") break # Only log first few rejects to reduce spam if consecutive_rejects <= 3: dbg(f"[WRIST] Rejecting frame {f}: shoulder-wrist angle {angle_from_horizontal:.1f}° too steep") continue # Found a good frame, reset consecutive reject counter consecutive_rejects = 0 y = lm[WR][1] # smaller y = higher hand (screen coords) candidates.append((f, y, angle_from_horizontal)) if y < best_y: best_y, best = y, f # Additional validation: ensure chosen frame is not an outlier if best is not None and candidates: # Check if there are other reasonable candidates nearby in height height_tolerance = 20 # pixels reasonable_candidates = [c for c in candidates if abs(c[1] - best_y) <= height_tolerance] if len(reasonable_candidates) > 1: # Among height-similar candidates, prefer one with better shoulder-wrist geometry reasonable_candidates.sort(key=lambda x: x[2]) # Sort by angle (smaller is better) best_candidate = reasonable_candidates[0] best = best_candidate[0] dbg(f"[WRIST] Selected frame {best} with angle {best_candidate[2]:.1f}° from {len(reasonable_candidates)} candidates") return best def _pick_top_frame_hinge(sm, backswing_frames, handed='right', roll_deg=0.0): """Pick frame with maximum hinge in backswing, with visibility and geometry validation""" if not backswing_frames: return None SH = 11 if handed=='right' else 12 # lead shoulder WR = 15 if handed=='right' else 16 # lead wrist # Restrict to P3→P4 window by limiting to last 70% of backswing p3_start_idx = max(0, int(0.3 * len(backswing_frames))) p3_p4_frames = backswing_frames[p3_start_idx:] top_vals = [] for f in p3_p4_frames: lm = sm.get(f) if not lm or not lm[WR] or not lm[SH] or lm[WR][2] < 0.7 or lm[SH][2] < 0.7: continue # Reject frames with shoulder-to-wrist line near vertical (same logic as height-based) dx = lm[WR][0] - lm[SH][0] dy = lm[WR][1] - lm[SH][1] if abs(dx) < 1e-6: # Perfectly vertical continue angle_from_horizontal = abs(math.degrees(math.atan2(dy, dx))) SHO_WRIST_STEEP_MAX = 120.0 # Unified constant if angle_from_horizontal > SHO_WRIST_STEEP_MAX: # Use unified threshold continue h = _hinge2d_shaft(lm, handed, roll_deg) if h is not None: # Additional sanity check for hinge values (h is now acute angle) if 10.0 <= h <= 120.0: # plausible range for acute angles top_vals.append((f, h)) if not top_vals: return None # Return frame with maximum hinge top_frame = max(top_vals, key=lambda x: x[1])[0] return top_frame def calculate_wrist_hinge_at_top(pose_data: Dict[int, List], swing_phases: Dict[str, List], handedness: str = 'right') -> Union[Dict[str, float], None]: """Calculate wrist hinge using shaft-based method with both P3 and top measurements""" # Define indices locally to avoid hidden globals L_EYE, R_EYE = 2, 5 L_HIP, R_HIP = 23, 24 sm = smooth_landmarks(pose_data) backswing_frames = swing_phases.get("backswing", []) if not backswing_frames: # Fallback: use any available frames as "backswing" all_frames = sorted(pose_data.keys()) if all_frames: # Use middle third of available frames as "backswing" mid_start = len(all_frames) // 3 mid_end = 2 * len(all_frames) // 3 backswing_frames = all_frames[mid_start:mid_end] or all_frames else: return None # Truly no data # Reuse consensus roll from shoulder tilt function def consensus_roll(lm, H=None, W=None): cands = [] for iL, iR in [(L_EYE, R_EYE), (L_HIP, R_HIP)]: if lm[iL] and lm[iR]: dx = lm[iR][0] - lm[iL][0] dy = lm[iR][1] - lm[iL][1] if abs(dx)+abs(dy) > 1e-6: t = math.degrees(math.atan2(dy, dx)) t = ((t + 90.0) % 180.0) - 90.0 if abs(t) <= 20: cands.append(t) return float(np.median(cands)) if cands else 0.0 # Initial roll from setup frames (clipped conservatively) setup_frames = swing_phases.get("setup", [])[:8] setup_rolls = [consensus_roll(sm.get(f)) for f in setup_frames if sm.get(f)] setup_roll_deg = float(np.clip(np.median(setup_rolls) if setup_rolls else 0.0, -10.0, 10.0)) # More robust top frame selection: try multiple methods top_frame = _pick_top_by_wrist_height(sm, backswing_frames, handed=handedness) if not top_frame: top_frame = _pick_top_frame_hinge(sm, backswing_frames, handed=handedness, roll_deg=setup_roll_deg) if not top_frame and backswing_frames: # Last resort: use frame with max backswing index that has good landmarks for f in reversed(backswing_frames): lm = sm.get(f) EL, WR = (13, 15) if handedness=='right' else (14, 16) if lm and lm[EL] and lm[WR] and lm[EL][2] >= 0.5 and lm[WR][2] >= 0.5: top_frame = f dbg(f"[WRIST] Using fallback top frame: {f}") break # Ensure we never have None for top_frame if top_frame is None and backswing_frames: top_frame = backswing_frames[-1] # Compute local roll around top frame for better accuracy roll_deg = setup_roll_deg # Start with setup roll as baseline if top_frame is not None: # Get frames around top for local roll calculation local_window = [f for f in range(max(backswing_frames[0], top_frame-4), min(backswing_frames[-1], top_frame+5)+1) if f in backswing_frames] local_rolls = [consensus_roll(sm.get(f)) for f in local_window if sm.get(f)] if local_rolls: # Use wider clipping range for top position (±20-25°) local_roll = float(np.median(local_rolls)) roll_deg = float(np.clip(local_roll, -25.0, 25.0)) dbg(f"[WRIST] Local roll at top: {roll_deg:.1f}° (setup: {setup_roll_deg:.1f}°)") else: dbg(f"[WRIST] Using setup roll: {roll_deg:.1f}° (no local roll data)") # FIXED: Calculate hinge with moving window average around top frame for smoothness hinge_top = None if top_frame is not None: # Use ±2 frames around top for moving window average (5-frame window) win = [f for f in range(max(backswing_frames[0], top_frame-2), min(backswing_frames[-1], top_frame+2)+1) if f in backswing_frames] valid_hinges = [] for f in win: landmarks = sm.get(f) if landmarks: h = _hinge2d_shaft(landmarks, handedness, roll_deg) if h is not None: # h here is now acute angle (angle between forearm and shaft) if 10.0 <= h <= 120.0: # plausible range for acute angles valid_hinges.append((f, h)) dbg(f"[WRIST] frame {f}: hinge_acute={h:.1f}°") if valid_hinges: # Use moving window average of valid hinge values for smoothness hinge_values = [h for _, h in valid_hinges] if len(hinge_values) >= 3: # Remove outliers and take median for robustness hinge_array = np.array(hinge_values) median_val = np.median(hinge_array) mad = np.median(np.abs(hinge_array - median_val)) outlier_threshold = median_val + 2.5 * mad # Remove values more than 2.5 MAD from median filtered_hinges = [h for h in hinge_values if abs(h - median_val) <= 2.5 * mad] if filtered_hinges: hinge_top = float(np.mean(filtered_hinges)) # Use mean of filtered values dbg(f"[WRIST] Smoothed hinge (mean of {len(filtered_hinges)} values): {hinge_top:.1f}°") else: hinge_top = float(median_val) else: # Not enough data for robust averaging, use median hinge_top = float(np.median(hinge_values)) dbg(f"[WRIST] Limited data, using median of {len(hinge_values)} values: {hinge_top:.1f}°") else: # Fallback: find best single frame best = (top_frame, None) for f in win: landmarks = sm.get(f) if landmarks: h = _hinge2d_shaft(landmarks, handedness, roll_deg) if h is not None and (best[1] is None or h > best[1]): best = (f, h) top_frame, hinge_top = best # Additional validation to catch landmark selection errors (now acute angles) if hinge_top is not None and (hinge_top < 5.0 or hinge_top > 120.0): dbg(f"WARNING: Suspicious acute hinge angle {hinge_top:.1f}° - possible landmark error") hinge_top = np.clip(hinge_top, 20.0, 90.0) # Conservative clamp for acute angles dbg(f"top_selection: selected={top_frame}") # FIXED: P3 frame and hinge with better validation p3_frame = _pick_p3_lead_arm_parallel(sm, backswing_frames, handedness) hinge_p3 = None if p3_frame is not None: p3_landmarks = sm.get(p3_frame) if p3_landmarks: hinge_p3 = _hinge2d_shaft(p3_landmarks, handedness, roll_deg) # Validate P3 hinge angle is reasonable (now acute angle) if hinge_p3 is not None and (hinge_p3 < 10.0 or hinge_p3 > 120.0): dbg(f"WARNING: Suspicious P3 acute hinge angle {hinge_p3:.1f}° - possible landmark error") # FIXED: Address baseline with improved landmark validation addr_frames = _pick_stable_setup_frames(sm, swing_phases, max_frames=6) or _address_frames(swing_phases) addr_vals = [] for f in addr_frames[:6]: addr_landmarks = sm.get(f) if addr_landmarks: h = _hinge2d_shaft(addr_landmarks, handedness, roll_deg) # Remove address hinge range filter - accept all valid measurements if h is not None: addr_vals.append(h) hinge_addr = float(np.median(addr_vals)) if addr_vals else None # If top hinge failed, borrow P3; if that also failed, borrow address if hinge_top is None: if hinge_p3 is not None: hinge_top = hinge_p3 dbg(f"hinge_top was None, borrowed from P3: {hinge_top}") elif hinge_addr is not None: hinge_top = hinge_addr # last resort (won't be graded as "top", but avoids None) dbg(f"hinge_top was None, borrowed from address: {hinge_top}") # Remove auto-promotion to avoid convergence bias # Keep original values to show real differences # hinge_top and hinge_p3 are now acute angles (forearm-shaft angle) theta_top = hinge_top # No conversion needed, already acute theta_p3 = hinge_p3 # No conversion needed, already acute # Debug prints to verify shaft angles if theta_top is not None: dbg(f"[WRIST-CHECK] theta_top = {theta_top:.1f}° (acute angle between forearm & shaft)") # Also calculate obtuse angles for backward compatibility and debugging obtuse_top = None if hinge_top is None else (180.0 - hinge_top) obtuse_p3 = None if hinge_p3 is None else (180.0 - hinge_p3) # Enhanced debug output with window information window_size = len([f for f in range(max(backswing_frames[0], top_frame-2), min(backswing_frames[-1], top_frame+2)+1) if f in backswing_frames]) if top_frame else 0 hinge_str = f"{hinge_top:.1f}" if hinge_top is not None else "None" dbg(f"[WRIST] samples={window_size}, mean={hinge_str}°, window=±2") dbg(f"[WRIST] roll={roll_deg:.1f}°, addr={hinge_addr}, p3={hinge_p3}, top={hinge_top}, " f"Δp3={None if hinge_addr is None or hinge_p3 is None else hinge_p3-hinge_addr}, " f"Δtop={None if hinge_addr is None or hinge_top is None else hinge_top-hinge_addr}") # Trust but verify debug printout if hinge_top is not None: dbg(f"[WRIST] final wrist_hinge_top={hinge_top:.1f}° (handed={handedness})") else: dbg(f"[WRIST] final wrist_hinge_top=None (handed={handedness})") result = { # Report acute angles (what coaches expect: ~50-70° for pros) "forearm_shaft_angle_top_deg": theta_top, "forearm_shaft_angle_p3_deg": theta_p3, "addr_deg": hinge_addr, "delta_deg_p3": (hinge_p3 - hinge_addr) if (hinge_p3 is not None and hinge_addr is not None) else None, "delta_deg_top": (hinge_top - hinge_addr) if (hinge_top is not None and hinge_addr is not None) else None, # NEW: Standardize to acute angles for main reporting "acute_angle_deg_top": hinge_top, "acute_angle_deg_p3": hinge_p3, # Keep obtuse angles for debugging and backward compatibility "obtuse_angle_deg_top": obtuse_top, "obtuse_angle_deg_p3": obtuse_p3, # Flag outside typical pro band for P3 (now using acute angle range) "definition_suspect": (hinge_p3 is None) or not (40 <= hinge_p3 <= 80), # Legacy field for backward compatibility - use acute angle "radial_deviation_deg": hinge_p3 if hinge_p3 is not None else hinge_top } return result def compute_front_facing_metrics(pose_data: Dict[int, List], swing_phases: Dict[str, List], world_landmarks: Optional[Dict[int, List]] = None, frames: Optional[List[np.ndarray]] = None, club: str = "iron", handedness: str = "right") -> Dict[str, Dict[str, Union[float, str, None]]]: """Compute the 4 front-facing golf swing metrics with club-aware grading""" # Calculate individual metrics torso_sidebend_result = calculate_torso_sidebend_at_impact( pose_data, swing_phases, frames=frames, handedness=handedness, world_landmarks=world_landmarks ) # Extract value for compatibility shoulder_tilt_impact = torso_sidebend_result.get("value") if torso_sidebend_result else None hip_sway_top = calculate_hip_sway_at_top(pose_data, swing_phases, world_landmarks) wrist_hinge_result = calculate_wrist_hinge_at_top(pose_data, swing_phases, handedness=handedness) hip_shoulder_sep_result = calculate_hip_shoulder_separation_at_impact(pose_data, swing_phases, handedness) # Extract wrist hinge values (NEW: prioritize forearm-shaft angle measurements) wrist_top_theta = wrist_hinge_result.get('forearm_shaft_angle_top_deg') if wrist_hinge_result else None wrist_p3_theta = wrist_hinge_result.get('forearm_shaft_angle_p3_deg') if wrist_hinge_result else None wrist_p3 = wrist_hinge_result.get('radial_deviation_deg_p3') if wrist_hinge_result else None wrist_top = wrist_hinge_result.get('radial_deviation_deg_top') if wrist_hinge_result else None wrist_addr = wrist_hinge_result.get('addr_deg') if wrist_hinge_result else None delta_p3 = wrist_hinge_result.get('delta_deg_p3') if wrist_hinge_result else None delta_top = wrist_hinge_result.get('delta_deg_top') if wrist_hinge_result else None definition_suspect = wrist_hinge_result.get('definition_suspect', False) if wrist_hinge_result else False # Initialize to avoid undefined locals wrist_final = None wrist_kind = None # Prefer radial deviation (hinge), then P3 radial, then θ as fallback if wrist_top is not None: wrist_final = wrist_top wrist_kind = "radial_deviation_top" elif wrist_p3 is not None: wrist_final = wrist_p3 wrist_kind = "radial_deviation_p3" elif wrist_top_theta is not None: wrist_final = wrist_top_theta wrist_kind = "forearm_shaft_angle_top" else: wrist_final = wrist_p3_theta wrist_kind = "forearm_shaft_angle_p3" # Add visible version tag and debug info print(f"[FF] {METRICS_VERSION} running (debug={'on' if VERBOSE else 'off'})") # Log metrics summary for debugging dbg(f"[METRIC] torso_side_bend={shoulder_tilt_impact}, sway_top_in={hip_sway_top}, hinge_top={wrist_hinge_result.get('radial_deviation_deg_top') if wrist_hinge_result else None}, hip_sho_sep={hip_shoulder_sep_result.get('hip_shoulder_sep_deg') if hip_shoulder_sep_result else None}") front_facing_metrics = { 'torso_side_bend_deg': { 'value': shoulder_tilt_impact, 'metric': torso_sidebend_result.get("metric") if torso_sidebend_result else None, 'quality': torso_sidebend_result.get("quality") if torso_sidebend_result else None, 'details': torso_sidebend_result.get("details") if torso_sidebend_result else None }, 'shoulder_tilt_impact_deg': {'value': shoulder_tilt_impact}, # Deprecated, alias for compatibility 'hip_sway_top_inches': {'value': hip_sway_top}, 'hip_shoulder_separation_impact_deg': { 'value': hip_shoulder_sep_result.get('hip_shoulder_sep_deg') if hip_shoulder_sep_result else None, 'abs_deg': hip_shoulder_sep_result.get('hip_shoulder_sep_abs_deg') if hip_shoulder_sep_result else None, 'confidence': hip_shoulder_sep_result.get('confidence') if hip_shoulder_sep_result else 0.0 }, 'wrist_hinge_top_deg': { 'value': wrist_final, 'value_kind': wrist_kind, 'forearm_shaft_angle_top_deg': wrist_top_theta, 'forearm_shaft_angle_p3_deg': wrist_p3_theta, 'radial_deviation_p3_deg': wrist_p3, 'radial_deviation_top_deg': wrist_top, 'addr_deg': wrist_addr, 'delta_p3_deg': delta_p3, 'delta_top_deg': delta_top, 'definition_suspect': definition_suspect }, '__version__': METRICS_VERSION, '_debug': { 'version': METRICS_VERSION, 'used_world': bool(world_landmarks), 'setup_frames': swing_phases.get('setup', [])[:8], 'impact_frames': swing_phases.get('impact', []), } } return front_facing_metrics