Spaces:
Paused
Paused
| """ | |
| Front-Facing Golf Swing Metrics Calculator | |
| This module contains all the mathematical calculations for front-facing golf swing metrics. | |
| The metrics_calculator.py file handles DTL (Down-the-Line) view metrics. | |
| """ | |
| import math | |
| import numpy as np | |
| import cv2 | |
| from typing import Dict, List, Tuple, Optional, Union | |
| # Landmark indices | |
| L_HIP, R_HIP = 23, 24 | |
| L_SHO, R_SHO = 11, 12 | |
| L_EYE, R_EYE = 2, 5 | |
| # Debug helper - set to True to see detailed metric calculations | |
| VERBOSE = False | |
| def set_debug(on: bool = True): | |
| global VERBOSE | |
| VERBOSE = bool(on) | |
| def dbg(msg): | |
| if VERBOSE: print(f"DEBUG: {msg}") | |
| # Version tag for tracking code changes | |
| METRICS_VERSION = "front-ff v2025-09-26-added-hip-shoulder-separation-4-metrics" | |
| def _best_foot_pair(lm, want_world=False): | |
| """ | |
| Return a robust (left,right) foot pair tuple for TOES/FOOT-INDEX with highest availability. | |
| In MediaPipe Pose: | |
| 29 = L_HEEL, 30 = R_HEEL, 31 = L_FOOT_INDEX (toe tip), 32 = R_FOOT_INDEX (toe tip) | |
| We prefer (31,32). If missing, fall back to (29,30). | |
| """ | |
| if lm is None: return None | |
| # World landmarks have 3D tuples; image landmarks have [x,y,vis] | |
| def ok(pt): | |
| if not pt: return False | |
| if want_world: | |
| return len(pt) >= 3 and all(np.isfinite(pt[:3])) | |
| else: | |
| return len(pt) >= 3 and pt[2] >= 0.4 and np.isfinite(pt[0]) and np.isfinite(pt[1]) | |
| # Prefer FOOT_INDEX (toes) | |
| l_idx, r_idx = 31, 32 | |
| if (len(lm) > r_idx) and ok(lm[l_idx]) and ok(lm[r_idx]): | |
| return (l_idx, r_idx) | |
| # Fall back to HEEL | |
| l_idx, r_idx = 29, 30 | |
| if (len(lm) > r_idx) and ok(lm[l_idx]) and ok(lm[r_idx]): | |
| return (l_idx, r_idx) | |
| return None | |
| def _toe_line_unit_img(lm, H, W, roll_deg): | |
| """ | |
| 2D: return unit toe-line vector in the de-rolled image frame. | |
| Uses best available (31,32) or (29,30). Returns None if unavailable. | |
| """ | |
| pair = _best_foot_pair(lm, want_world=False) | |
| if not pair: return None | |
| iL, iR = pair | |
| dx = (lm[iR][0] - lm[iL][0]) * (W if max(abs(lm[iR][0]-lm[iL][0]), abs(lm[iR][1]-lm[iL][1])) <= 2.0 else 1.0) | |
| dy = (lm[iR][1] - lm[iL][1]) * (H if max(abs(lm[iR][0]-lm[iL][0]), abs(lm[iR][1]-lm[iL][1])) <= 2.0 else 1.0) | |
| cr, sr = math.cos(math.radians(roll_deg)), math.sin(math.radians(roll_deg)) | |
| tx, ty = (cr*dx + sr*dy, -sr*dx + cr*dy) # de-rolled | |
| n = math.hypot(tx, ty) | |
| if n < 1e-6: return None | |
| return (tx/n, ty/n) | |
| def _toe_line_unit_world(wf): | |
| """ | |
| 3D/XZ: return unit toe-line vector (R-L) in XZ. Prefers (31,32), else (29,30). | |
| """ | |
| pair = _best_foot_pair(wf, want_world=True) | |
| if not pair: return None | |
| iL, iR = pair | |
| vx = wf[iR][0] - wf[iL][0] | |
| vz = wf[iR][2] - wf[iL][2] | |
| n = math.hypot(vx, vz) | |
| if n < 1e-6: return None | |
| return (vx/n, vz/n) | |
| # Core math utilities | |
| def wrap180(a): | |
| return (a + 180.0) % 360.0 - 180.0 | |
| def unit2(v): | |
| """Normalize 2D vector""" | |
| n = (v[0]*v[0] + v[1]*v[1])**0.5 | |
| return (v[0]/n, v[1]/n) if n > 1e-6 else (0.0, 1.0) | |
| def signed_angle2(u, v): | |
| """Signed angle between two 2D vectors in degrees (-180, 180]""" | |
| dot = u[0]*v[0] + u[1]*v[1] | |
| det = u[0]*v[1] - u[1]*v[0] | |
| return math.degrees(math.atan2(det, dot)) | |
| def _eye_roll_deg(lx, ly, rx, ry): | |
| """Calculate angle of the line from right eye to left eye""" | |
| return math.degrees(math.atan2(ly - ry, lx - rx)) | |
| def _midpt(a, b): | |
| """Calculate midpoint of two 2D points""" | |
| return ((a[0] + b[0]) * 0.5, (a[1] + b[1]) * 0.5) | |
| def smooth_landmarks(pose_data: Dict[int, List], alpha: float = 0.35) -> Dict[int, List]: | |
| """Apply exponential moving average smoothing to landmarks""" | |
| if not pose_data: | |
| return pose_data | |
| frame_indices = sorted(pose_data.keys()) | |
| smoothed_data = {} | |
| for frame_idx in frame_indices: | |
| landmarks = pose_data[frame_idx] | |
| if not landmarks or not isinstance(landmarks, list): | |
| continue | |
| smoothed_landmarks = [] | |
| for i, lm in enumerate(landmarks): | |
| if lm and isinstance(lm, list) and len(lm) >= 3: | |
| x, y, vis = lm[0], lm[1], lm[2] | |
| # Get previous smoothed values | |
| if frame_idx > 0 and frame_idx - 1 in smoothed_data: | |
| prev_lms = smoothed_data[frame_idx - 1] | |
| if (i < len(prev_lms) and prev_lms[i] and | |
| isinstance(prev_lms[i], list) and len(prev_lms[i]) >= 3): | |
| px, py, pv = prev_lms[i][0], prev_lms[i][1], prev_lms[i][2] | |
| x = alpha * x + (1 - alpha) * px | |
| y = alpha * y + (1 - alpha) * py | |
| vis = alpha * vis + (1 - alpha) * pv | |
| smoothed_landmarks.append([x, y, vis]) | |
| else: | |
| smoothed_landmarks.append(lm) | |
| smoothed_data[frame_idx] = smoothed_landmarks | |
| return smoothed_data | |
| def has_world_ok_scale(world_landmarks, frame_idx=None): | |
| """Check if world landmarks have reasonable scale (relaxed ranges for hip/shoulder span)""" | |
| if not world_landmarks: | |
| return False | |
| frames_to_check = [frame_idx] if frame_idx is not None else list(world_landmarks.keys())[:5] | |
| for f in frames_to_check: | |
| if f not in world_landmarks: | |
| continue | |
| wf = world_landmarks[f] | |
| if not wf or len(wf) < 25: | |
| continue | |
| # Check hip span (relaxed range: 0.20-0.70m) | |
| if wf[23] and wf[24]: # L_HIP, R_HIP | |
| hip_span = math.hypot(wf[24][0] - wf[23][0], wf[24][2] - wf[23][2]) | |
| if 0.20 <= hip_span <= 0.70: | |
| dbg(f"[SCALE] Hip span {hip_span:.3f}m is reasonable") | |
| return True | |
| # Check shoulder span (relaxed range: 0.25-0.70m) | |
| if wf[11] and wf[12]: # L_SHO, R_SHO | |
| sho_span = math.hypot(wf[12][0] - wf[11][0], wf[12][2] - wf[11][2]) | |
| if 0.25 <= sho_span <= 0.70: | |
| dbg(f"[SCALE] Shoulder span {sho_span:.3f}m is reasonable") | |
| return True | |
| # Debug why scale check failed | |
| if frames_to_check: | |
| f = frames_to_check[0] | |
| if f in world_landmarks: | |
| wf = world_landmarks[f] | |
| if wf and len(wf) >= 25: | |
| if wf[23] and wf[24]: | |
| hip_span = math.hypot(wf[24][0] - wf[23][0], wf[24][2] - wf[23][2]) | |
| dbg(f"[SCALE] Hip span {hip_span:.3f}m out of range [0.20, 0.70]") | |
| if wf[11] and wf[12]: | |
| sho_span = math.hypot(wf[12][0] - wf[11][0], wf[12][2] - wf[11][2]) | |
| dbg(f"[SCALE] Shoulder span {sho_span:.3f}m out of range [0.25, 0.70]") | |
| return False | |
| def unit3(v): | |
| """Normalize 3D vector""" | |
| n = math.sqrt(v[0]*v[0] + v[1]*v[1] + v[2]*v[2]) | |
| return np.array(v) / n if n > 1e-6 else np.array([0.0, 0.0, 1.0]) | |
| def world_vec(world_landmarks, frame_idx, landmark_name): | |
| """Get world vector for landmark""" | |
| landmark_map = {'C7': 0, 'L_HIP': 23, 'R_HIP': 24} # C7 approximated as nose for now | |
| if landmark_name == 'C7': | |
| # Approximate C7 as midpoint between shoulders raised slightly | |
| if (frame_idx in world_landmarks and len(world_landmarks[frame_idx]) > 12 and | |
| world_landmarks[frame_idx][11] and world_landmarks[frame_idx][12]): | |
| l_sho = np.array(world_landmarks[frame_idx][11][:3]) | |
| r_sho = np.array(world_landmarks[frame_idx][12][:3]) | |
| # Raise midpoint slightly to approximate C7 | |
| c7_approx = (l_sho + r_sho) / 2 + np.array([0, 0.08, 0]) # 8cm higher | |
| return c7_approx | |
| idx = landmark_map.get(landmark_name) | |
| if idx is None or frame_idx not in world_landmarks: | |
| return None | |
| wf = world_landmarks[frame_idx] | |
| if not wf or len(wf) <= idx or not wf[idx]: | |
| return None | |
| return np.array(wf[idx][:3]) | |
| def any_nan(arr): | |
| """Check if array contains any NaN values""" | |
| return np.any(np.isnan(arr)) if arr is not None else True | |
| def calibrate_sidebend_sign(world_landmarks, impact_idx, handedness, fwd_hat_3d): | |
| """ | |
| Returns +1 or -1: multiplier to convert 2D dx sign into 'away-from-target = +' | |
| Uses short window around impact to vote with 3D sign, falls back to handed rule. | |
| """ | |
| if not has_world_ok_scale(world_landmarks): | |
| return 1 if handedness == 'right' else -1 | |
| W = range(max(0, impact_idx-2), min(len(list(world_landmarks.keys())), impact_idx+3)) | |
| votes = 0 | |
| for i in W: | |
| if i not in world_landmarks: | |
| continue | |
| c7 = world_vec(world_landmarks, i, 'C7') | |
| l_hip = world_vec(world_landmarks, i, 'L_HIP') | |
| r_hip = world_vec(world_landmarks, i, 'R_HIP') | |
| if c7 is None or l_hip is None or r_hip is None: | |
| continue | |
| hip = (l_hip + r_hip) * 0.5 | |
| spine = unit3(c7 - hip) | |
| # lateral-right unit on ground plane from forward | |
| lat = unit3(np.cross([0,1,0], fwd_hat_3d)) | |
| s3d = np.sign(np.dot(spine, lat)) # + means leaning to player's right | |
| votes += s3d | |
| # majority 3D says which side is player-right; convert to 'away from target = +' | |
| # For RH, away-from-target == player-right; for LH, opposite. | |
| prefer_player_right_positive = np.sign(votes) if votes != 0 else 1 | |
| return prefer_player_right_positive if handedness == 'right' else -prefer_player_right_positive | |
| def calculate_torso_sidebend_at_impact(pose_data, swing_phases, frames=None, image_shape=None, handedness='right', world_landmarks=None): | |
| """ | |
| Front-facing: torso side-bend in degrees at impact. | |
| Defined as angle between (neck midpoint → hip midpoint) and screen-vertical, | |
| computed in derolled screen space. | |
| """ | |
| dbg(f"[SB] handed={handedness}") | |
| # --- Resolve impact frame (with a tiny +δ tolerance) --- | |
| impact_frames = swing_phases.get("impact", []) | |
| if impact_frames: | |
| impact_idx = int(round(impact_frames[0])) | |
| else: | |
| # Fallback: find impact using peak hand speed if no swing phases provided | |
| impact_idx = _find_impact_frame(pose_data, world_landmarks) | |
| if impact_idx is None: | |
| impact_idx = sorted(pose_data.keys())[-1] if pose_data else None | |
| # Surgical asserts to guarantee true impact frame | |
| assert impact_idx is not None, "[SB] impact_idx None (missing world_landmarks pass-thru?)" | |
| n = len(pose_data["landmarks"]) if "landmarks" in pose_data else len(pose_data) | |
| assert impact_idx < n-1, f"[SB] Using last frame ({impact_idx}/{n-1}) — not true impact" | |
| dbg(f"[SB] impact_idx={impact_idx} of {n} frames (OK)") | |
| if impact_idx is None: | |
| dbg("[TORSO] No impact frame found") | |
| return {"metric": "torso_side_bend_deg", "value": None, "quality": "no-data"} | |
| num_frames = len(pose_data["landmarks"]) if "landmarks" in pose_data else len(pose_data) | |
| # TEMPORAL SMOOTHING: Use ±2-frame window around impact for better timing | |
| candidates = [impact_idx - 2, impact_idx - 1, impact_idx, impact_idx + 1, impact_idx + 2] | |
| candidates = [i for i in candidates if 0 <= i < num_frames and i in pose_data] | |
| # --- Improved de-roll using eyes + hips (not eyes-only) to reduce drift --- | |
| lo = max(0, impact_idx - 4) | |
| hi = min(num_frames - 1, impact_idx + 5) | |
| combined_rolls = [] | |
| for i in range(lo, hi + 1): | |
| if i not in pose_data: | |
| continue | |
| L_EYE, R_EYE = 2, 5 | |
| L_HIP, R_HIP = 23, 24 | |
| lm = pose_data[i] | |
| if not lm or len(lm) <= max(R_EYE, R_HIP): | |
| continue | |
| # Collect both eye and hip roll angles | |
| frame_rolls = [] | |
| # Eye roll | |
| le, re = lm[L_EYE], lm[R_EYE] | |
| if le is not None and re is not None: | |
| (lx, ly), (rx, ry) = le[:2], re[:2] | |
| frame_rolls.append(_eye_roll_deg(lx, ly, rx, ry)) | |
| # Hip roll | |
| lh, rh = lm[L_HIP], lm[R_HIP] | |
| if lh is not None and rh is not None: | |
| (lx, ly), (rx, ry) = lh[:2], rh[:2] | |
| frame_rolls.append(_eye_roll_deg(lx, ly, rx, ry)) | |
| # Use median of available rolls for this frame | |
| if frame_rolls: | |
| combined_rolls.append(np.median(frame_rolls)) | |
| roll_deg = 0.0 | |
| if combined_rolls: | |
| arr = np.array(combined_rolls, dtype=float) | |
| lo_q, hi_q = np.percentile(arr, [15, 85]) | |
| arr = arr[(arr >= lo_q) & (arr <= hi_q)] | |
| roll_deg = float(np.median(np.clip(arr, -25.0, 25.0))) | |
| dbg(f"[TORSO] eyes+hips roll (trimmed median) = {roll_deg:.1f}°") | |
| # --- Choose best candidate (prefer middle of temporal window) --- | |
| valid_candidates = [] | |
| L_SHO, R_SHO, L_HIP, R_HIP = 11, 12, 23, 24 | |
| for i in candidates: | |
| lm = pose_data[i] | |
| if not lm or len(lm) <= max(L_SHO, R_SHO, L_HIP, R_HIP): | |
| continue | |
| ls = lm[L_SHO] | |
| rs = lm[R_SHO] | |
| lh = lm[L_HIP] | |
| rh = lm[R_HIP] | |
| if None not in (ls, rs, lh, rh): | |
| valid_candidates.append(i) | |
| if not valid_candidates: | |
| dbg("[TORSO] No valid landmarks around impact") | |
| return {"metric": "torso_side_bend_deg", "value": None, "quality": "no-data"} | |
| # TEMPORAL MEDIAN: Calculate side-bend from all valid frames for stability | |
| def _get_HW(): | |
| if frames: | |
| H, W = frames[0].shape[:2]; return float(H), float(W) | |
| if image_shape and len(image_shape) >= 2: | |
| H, W = image_shape[:2]; return float(H), float(W) | |
| return None, None | |
| H, W = _get_HW() | |
| cx, cy = (W / 2.0 if W else 0.0), (H / 2.0 if H else 0.0) | |
| rad = math.radians(-roll_deg) | |
| cr, sr = math.cos(rad), math.sin(rad) | |
| def _deroll(pt): | |
| x, y = pt[:2] # Handle both [x,y] and [x,y,vis] formats | |
| x0, y0 = x - cx, y - cy | |
| xr = cr * x0 - sr * y0 + cx | |
| yr = sr * x0 + cr * y0 + cy | |
| return (xr, yr) | |
| # Calculate side-bend for all valid frames and take median | |
| L_SHO, R_SHO, L_HIP, R_HIP = 11, 12, 23, 24 | |
| sidebend_values = [] | |
| for frame_idx in valid_candidates: | |
| lm = pose_data[frame_idx] | |
| ls = _deroll(lm[L_SHO]) | |
| rs = _deroll(lm[R_SHO]) | |
| lh = _deroll(lm[L_HIP]) | |
| rh = _deroll(lm[R_HIP]) | |
| neck = _midpt(ls, rs) | |
| midhip = _midpt(lh, rh) | |
| dx = neck[0] - midhip[0] | |
| dy = neck[1] - midhip[1] # y grows downward | |
| # 2D frontal-plane magnitude only | |
| dy = abs(dy) + 1e-6 # Ensure positive denominator | |
| sidebend_abs_frame = math.degrees(math.atan2(abs(dx), dy)) | |
| sidebend_values.append((dx, sidebend_abs_frame)) # Store both dx and magnitude | |
| # Use median magnitude and median dx sign | |
| if sidebend_values: | |
| dx_values = [sv[0] for sv in sidebend_values] | |
| abs_values = [sv[1] for sv in sidebend_values] | |
| dx = np.median(dx_values) # Median dx for sign | |
| sidebend_abs = np.median(abs_values) # Median magnitude | |
| chosen = valid_candidates[len(valid_candidates)//2] # For debugging reference | |
| else: | |
| return {"metric": "torso_side_bend_deg", "value": None, "quality": "no-data"} | |
| # Get forward direction for sign calibration | |
| fwd_hat_3d = None | |
| if world_landmarks is not None: | |
| setup_frames = swing_phases.get("setup", [])[:5] or sorted(world_landmarks.keys())[:5] | |
| for f in setup_frames: | |
| if f in world_landmarks: | |
| toe_vec_3d = _toe_line_unit_world(world_landmarks[f]) | |
| if toe_vec_3d is not None: | |
| # Forward perpendicular to toe line in XZ plane | |
| fwd_hat_3d = np.array([-toe_vec_3d[1], 0, toe_vec_3d[0]]) | |
| break | |
| # Standardized sign convention: positive toward target for right-handed | |
| sign_2d_raw = 1.0 if dx > 0 else -1.0 | |
| cal = calibrate_sidebend_sign(world_landmarks, impact_idx, handedness, fwd_hat_3d) | |
| # Apply standardized sign convention: positive = away from target | |
| # For right-handed: away from target = toward player's right | |
| # For left-handed: away from target = toward player's left | |
| final_sign = sign_2d_raw * cal | |
| sidebend_deg = final_sign * sidebend_abs | |
| # Guardrails: clip extreme values but warn | |
| if abs(sidebend_deg) > 30: | |
| dbg(f"[SB] rejecting |sb|={abs(sidebend_deg):.1f}° > 30°, check roll/coords") | |
| sidebend_deg = math.copysign(30.0, sidebend_deg) | |
| # Debug output matching expected format | |
| dbg(f"[SB] 2D mag={sidebend_abs:.1f}°, cal={cal:+.0f}, final={sidebend_deg:.1f}°") | |
| dbg(f"[TORSO] side-bend = {sidebend_deg:.1f}° (dx={dx:.1f}, dy={abs(dy):.1f}, abs={sidebend_abs:.1f}°)") | |
| # Optional: shoulder drop % of head height (kept because it's useful) | |
| head_h = pose_data.get("head_h", {}).get(chosen) | |
| drop_pct = None | |
| if head_h and head_h > 0: | |
| # lead vs trail by handedness; y increases downward in image coords | |
| lead_y = ls[1] if handedness == 'right' else rs[1] | |
| trail_y = rs[1] if handedness == 'right' else ls[1] | |
| dy_sho = (lead_y - trail_y) | |
| drop_pct = 100.0 * (dy_sho / head_h) | |
| dbg(f"[TORSO] lead-shoulder drop = {drop_pct:.1f}% head") | |
| # Trust but verify debug printout | |
| dbg(f"[SB] final torso_sidebend={sidebend_deg:.1f}° at frame={chosen}") | |
| return { | |
| "metric": "torso_side_bend_deg", | |
| "value": sidebend_deg, | |
| "quality": "ok", | |
| "details": { | |
| "frame": chosen, | |
| "roll_used_deg": roll_deg, | |
| "shoulder_drop_pct_head": drop_pct | |
| } | |
| } | |
| def calculate_shoulder_tilt_at_impact(*args, **kwargs): | |
| """ | |
| DEPRECATED: Shoulder tilt removed due to occlusion instability. | |
| Returns torso side-bend instead for backward compatibility. | |
| """ | |
| dbg("[DEPRECATION] Shoulder tilt removed. Returning torso side-bend instead.") | |
| res = calculate_torso_sidebend_at_impact(*args, **kwargs) | |
| # keep old key for downstream consumers if needed: | |
| return res["value"] # Return just the numeric value for compatibility | |
| def _find_impact_frame(pose_data, world_landmarks=None): | |
| """Find impact frame using peak hand speed if not provided in swing phases""" | |
| if not world_landmarks: | |
| return None | |
| frames = sorted(world_landmarks.keys()) | |
| if len(frames) < 3: | |
| return None | |
| speeds = [] | |
| for i in range(1, len(frames)): | |
| f_curr = frames[i] | |
| f_prev = frames[i-1] | |
| curr_frame = world_landmarks.get(f_curr) | |
| prev_frame = world_landmarks.get(f_prev) | |
| if (curr_frame and len(curr_frame) > 16 and | |
| prev_frame and len(prev_frame) > 16 and | |
| curr_frame[16] and prev_frame[16]): # Right wrist | |
| curr_wrist = curr_frame[16] | |
| prev_wrist = prev_frame[16] | |
| # Calculate 3D speed | |
| dx = curr_wrist[0] - prev_wrist[0] | |
| dy = curr_wrist[1] - prev_wrist[1] | |
| dz = curr_wrist[2] - prev_wrist[2] | |
| speed = math.sqrt(dx*dx + dy*dy + dz*dz) | |
| speeds.append((f_curr, speed)) | |
| if speeds: | |
| # Find peak speed frame | |
| peak_frame = max(speeds, key=lambda x: x[1])[0] | |
| # Impact is typically 1 frame before peak speed | |
| impact_frame = peak_frame - 1 | |
| return impact_frame if impact_frame in frames else peak_frame | |
| return None | |
| def _target_hat_from_toes_world(world_landmarks, setup_frames, handedness='right'): | |
| """Build stable target axis from toe line at address with outlier trimming""" | |
| LEFT_TOE, RIGHT_TOE = 31, 32 | |
| toe_vecs = [] | |
| for f in setup_frames: | |
| if f in world_landmarks: | |
| frame = world_landmarks[f] | |
| if frame and len(frame) > 32: | |
| try: | |
| Ltoe = frame[LEFT_TOE] | |
| Rtoe = frame[RIGHT_TOE] | |
| if Ltoe and Rtoe and len(Ltoe) >= 3 and len(Rtoe) >= 3: | |
| toe_vec = (Rtoe[0]-Ltoe[0], Rtoe[2]-Ltoe[2]) | |
| if (toe_vec[0]**2 + toe_vec[1]**2) > 1e-6: | |
| toe_vecs.append(toe_vec) | |
| except Exception: | |
| continue | |
| if not toe_vecs: | |
| dbg("target_axis - NO VALID TOE VECTORS, using default") | |
| return (1.0, 0.0) | |
| # Drop 20% widest-angle outliers before median | |
| toe_vecs = np.array(toe_vecs, dtype=float) | |
| angles = np.degrees(np.arctan2(toe_vecs[:,1], toe_vecs[:,0])) | |
| med, dev = np.median(angles), np.abs(angles - np.median(angles)) | |
| keep = dev <= np.percentile(dev, 80) | |
| target_hat = unit2(np.median(toe_vecs[keep], axis=0)) | |
| # Optional: flip sign for consistency if you want +X toward target for left-handed | |
| if handedness == 'left': | |
| target_hat = (-target_hat[0], -target_hat[1]) | |
| return target_hat | |
| def _pick_stable_setup_frames(sm, swing_phases, max_frames=6): | |
| """Pick first address frames with low pelvis motion (reduces waggle bias).""" | |
| cand = swing_phases.get("setup", [])[:12] | |
| if not cand: | |
| return [] | |
| def pelvis_ctr_px(f): | |
| lm = sm.get(f) | |
| if not lm or not lm[23] or not lm[24]: return None | |
| return np.array([(lm[23][0]+lm[24][0])*0.5, (lm[23][1]+lm[24][1])*0.5], float) | |
| centers = [(f, pelvis_ctr_px(f)) for f in cand] | |
| centers = [(f, c) for f,c in centers if c is not None] | |
| if len(centers) < 2: return [f for f,_ in centers][:max_frames] | |
| # speed threshold: keep earliest frames whose step-to-step pelvis motion is small | |
| speeds = [] | |
| for i in range(1, len(centers)): | |
| f0,c0 = centers[i-1]; f1,c1 = centers[i] | |
| speeds.append((f1, float(np.linalg.norm(c1-c0)))) | |
| if not speeds: return [centers[0][0]] | |
| # pick prefix until motion inflates (waggle begins) | |
| thr = max(1.5, np.median([s for _,s in speeds])*2.5) | |
| keep = [centers[0][0]] | |
| for f,s in speeds: | |
| if s <= thr and len(keep) < max_frames: | |
| keep.append(f) | |
| else: | |
| break | |
| return keep | |
| def _build_foot_only_homography(sm, setup_frames): | |
| """ | |
| Build ground-plane H from 4 foot points only (no hips). | |
| MediaPipe indices: 27=L_heel, 28=R_heel, 29=L_ankle, 30=R_ankle, 31=L_foot_index, 32=R_foot_index | |
| Image pts use (31,32) for toes or (29,30) for ankles as fallback. | |
| Ground pts are a rectangle in arbitrary inches; scale will be calibrated later. | |
| """ | |
| import cv2 | |
| for f in setup_frames: | |
| lm = sm.get(f) | |
| if not lm: continue | |
| needed = [29,30,31,32] | |
| if any((i>=len(lm) or lm[i] is None or lm[i][2] < 0.5) for i in needed): | |
| continue | |
| # Image points - use consistent ankle/toe mapping | |
| img = np.array([ | |
| [lm[29][0], lm[29][1]], # L_ankle -> treat as heel | |
| [lm[30][0], lm[30][1]], # R_ankle -> heel | |
| [lm[31][0], lm[31][1]], # L_foot_index -> toe | |
| [lm[32][0], lm[32][1]], # R_foot_index -> toe | |
| ], dtype=np.float32) | |
| # Ground rectangle (left/right must match the image ordering above) | |
| ground = np.array([ | |
| [-8.0, 0.0], # L heel | |
| [ 8.0, 0.0], # R heel | |
| [-8.0, 11.0], # L toe | |
| [ 8.0, 11.0], # R toe | |
| ], dtype=np.float32) | |
| H, mask = cv2.findHomography(img, ground, cv2.RANSAC, 2.0) | |
| if H is None: | |
| continue | |
| det = abs(np.linalg.det(H[:2,:2])) | |
| if det > 1e-5: | |
| return H, True, f | |
| return None, False, None | |
| def _simple_pixel_sway_fallback(sm, setup_frames, top_frame, swing_phases=None, world_landmarks=None): | |
| """Improved pixel-based sway fallback with world landmark scale calibration""" | |
| def get_lm(frame_idx): | |
| return sm.get(frame_idx) | |
| # pick a setup frame that has feet + hips | |
| setup_ok = [f for f in setup_frames | |
| if get_lm(f) and get_lm(f)[23] and get_lm(f)[24] and get_lm(f)[31] and get_lm(f)[32]] | |
| if not setup_ok: | |
| return 0.0 | |
| f0 = setup_ok[0] | |
| lm0 = get_lm(f0) | |
| # pelvis centers (pixels) | |
| def pelvis_ctr_px(f): | |
| lm = get_lm(f) | |
| if not lm or not lm[23] or not lm[24]: | |
| return None | |
| return np.array([(lm[23][0] + lm[24][0]) * 0.5, | |
| (lm[23][1] + lm[24][1]) * 0.5], dtype=float) | |
| setup_ctrs = [pelvis_ctr_px(f) for f in setup_ok] | |
| setup_ctrs = [c for c in setup_ctrs if c is not None] | |
| if not setup_ctrs: | |
| return 0.0 | |
| setup_ctr = np.median(np.stack(setup_ctrs), axis=0) | |
| top_ctr = pelvis_ctr_px(top_frame) | |
| if top_ctr is None: | |
| return 0.0 | |
| # toe vector in pixels from the same setup frame | |
| toe_v = np.array([lm0[32][0] - lm0[31][0], | |
| lm0[32][1] - lm0[31][1]], dtype=float) | |
| n = np.linalg.norm(toe_v) | |
| if n < 1e-6: | |
| # fallback to horizontal approximation if toes are missing | |
| toe_hat_px = np.array([1.0, 0.0]) | |
| else: | |
| toe_hat_px = toe_v / n | |
| # Target axis (toe line itself) in pixel space | |
| target_hat_px = toe_hat_px / (np.linalg.norm(toe_hat_px) + 1e-9) | |
| # --- NEW: sign disambiguation using address -> impact pelvis centers in pixel space --- | |
| if swing_phases and swing_phases.get("impact"): | |
| f_imp = swing_phases["impact"][0] | |
| # median address pelvis center in px | |
| addr_ctrs = [pelvis_ctr_px(f) for f in setup_ok] | |
| addr_ctrs = [c for c in addr_ctrs if c is not None] | |
| imp_ctr = pelvis_ctr_px(f_imp) | |
| if addr_ctrs and imp_ctr is not None: | |
| addr_ctr = np.median(np.stack(addr_ctrs), axis=0) | |
| fwd_vec_px = imp_ctr - addr_ctr | |
| if float(np.dot(fwd_vec_px, target_hat_px)) < 0.0: | |
| dbg("[SWAY] Flipped target_hat_px based on addr→impact motion") | |
| target_hat_px = -target_hat_px | |
| delta_px = top_ctr - setup_ctr | |
| sway_px = float(np.dot(delta_px, target_hat_px)) | |
| # Improved px→inch scale using world landmarks with consistency check | |
| px_per_inch_candidates = [] | |
| default_px_per_inch = 10.0 | |
| if world_landmarks is not None: | |
| # Try to get px_per_inch from multiple setup frames | |
| for f in setup_ok: | |
| if f in world_landmarks: | |
| try: | |
| wf = world_landmarks[f] | |
| lm = get_lm(f) | |
| L_FOOT_INDEX, R_FOOT_INDEX = 31, 32 # foot_index points (toe tips) | |
| Lt, Rt = wf[L_FOOT_INDEX], wf[R_FOOT_INDEX] | |
| if Lt and Rt and lm and lm[31] and lm[32]: | |
| # 3D toe spacing in meters → inches | |
| d_m = math.hypot(Rt[0]-Lt[0], Rt[2]-Lt[2]) | |
| d_in_true = d_m * 39.3701 | |
| # Pixel toe spacing for this frame | |
| toe_v_frame = np.array([lm[32][0] - lm[31][0], lm[32][1] - lm[31][1]], dtype=float) | |
| d_px = float(np.linalg.norm(toe_v_frame)) | |
| if d_px > 1e-3 and d_in_true > 1e-3: | |
| candidate = d_px / d_in_true | |
| # Sanity check: reasonable range for px_per_inch (5-25 depending on resolution/crop) | |
| if 5.0 <= candidate <= 25.0: | |
| px_per_inch_candidates.append(candidate) | |
| dbg(f"pixel_fallback: frame {f} px_per_inch={candidate:.2f}") | |
| except Exception: | |
| continue | |
| # Apply consistency check: reject frames with >10% deviation from median | |
| if len(px_per_inch_candidates) > 1: | |
| initial_median = np.median(px_per_inch_candidates) | |
| consistent_candidates = [] | |
| for val in px_per_inch_candidates: | |
| deviation_pct = abs(val - initial_median) / initial_median * 100 | |
| if deviation_pct <= 10.0: # Within 10% of median | |
| consistent_candidates.append(val) | |
| else: | |
| dbg(f"pixel_fallback: rejecting inconsistent px_per_inch={val:.2f} (deviation={deviation_pct:.1f}%)") | |
| px_per_inch_candidates = consistent_candidates | |
| # Use median of candidates if available, otherwise use default | |
| if px_per_inch_candidates: | |
| px_per_inch = float(np.median(px_per_inch_candidates)) | |
| dbg(f"pixel_fallback: median px_per_inch={px_per_inch:.2f} from {len(px_per_inch_candidates)} consistent frames") | |
| else: | |
| px_per_inch = default_px_per_inch | |
| dbg(f"pixel_fallback: using default px_per_inch={px_per_inch:.2f}") | |
| sway_inches = sway_px / px_per_inch | |
| # Apply same robust baseline approach to pixel fallback | |
| if len(setup_ctrs) >= 2: | |
| setup_x_px = [c[0] for c in setup_ctrs] | |
| t = np.arange(len(setup_x_px)) | |
| m, b = np.polyfit(t, setup_x_px, 1) if len(setup_x_px) >= 2 else (0, setup_x_px[0]) | |
| residuals = np.array(setup_x_px) - (m * t + b) | |
| mad_residuals = np.median(np.abs(residuals - np.median(residuals))) | |
| jitter_px = 1.4826 * mad_residuals | |
| jitter_in = jitter_px / px_per_inch | |
| else: | |
| jitter_in = 0.1 # Minimal fallback | |
| # Calculate trust ratio | |
| sway_jitter_ratio = abs(sway_inches) / max(jitter_in, 1e-6) | |
| trust_flag = "trusted" if sway_jitter_ratio >= 1.5 else "low-trust" | |
| # Normalize by stance width if using pixel fallback and world landmarks available | |
| normalized_sway = None | |
| stance_width_in = None | |
| if world_landmarks: | |
| stance_widths = [] | |
| for f in setup_ok: | |
| if f in world_landmarks: | |
| try: | |
| Lt_w, Rt_w = world_landmarks[f][31], world_landmarks[f][32] | |
| toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3])) | |
| stance_widths.append(float(toe_world_m * 39.3701)) # Convert to inches | |
| except: | |
| pass | |
| if stance_widths: | |
| stance_width_in = np.median(stance_widths) | |
| if 14 <= stance_width_in <= 28: | |
| normalized_sway = sway_inches / stance_width_in | |
| # Enhanced pixel fallback logging matching main function | |
| norm_str = f"{normalized_sway:.3f}" if normalized_sway is not None else "NA" | |
| dbg(f"pixel_fallback: dx={sway_inches:.2f}in, norm={norm_str}, jitter={jitter_in:.2f}in, ratio={sway_jitter_ratio:.1f}, trust={trust_flag}") | |
| if stance_width_in: | |
| dbg(f"pixel_fallback: stance_width={stance_width_in:.1f}\", target_normalized≈0.16 for pros") | |
| return sway_inches | |
| def _subframe_impact_refinement(world_landmarks: Dict[int, List], impact_candidate: int) -> float: | |
| """ | |
| Refine impact timing to sub-frame precision using parabolic fit to hand speed | |
| Fits parabola around the impact candidate and returns fractional frame timestamp | |
| """ | |
| try: | |
| # Get frames around impact candidate for parabolic fit | |
| test_frames = [f for f in range(impact_candidate-2, impact_candidate+3) | |
| if f in world_landmarks] | |
| if len(test_frames) < 3: | |
| return float(impact_candidate) # Not enough data for refinement | |
| # Calculate hand speeds around impact | |
| speeds = [] | |
| frame_indices = [] | |
| for i in range(1, len(test_frames)): | |
| f_curr = test_frames[i] | |
| f_prev = test_frames[i-1] | |
| curr_frame = world_landmarks[f_curr] | |
| prev_frame = world_landmarks[f_prev] | |
| if (curr_frame and len(curr_frame) > 16 and | |
| prev_frame and len(prev_frame) > 16 and | |
| curr_frame[16] and prev_frame[16]): | |
| curr_wrist = curr_frame[16] | |
| prev_wrist = prev_frame[16] | |
| # Calculate 3D speed | |
| dx = curr_wrist[0] - prev_wrist[0] | |
| dy = curr_wrist[1] - prev_wrist[1] | |
| dz = curr_wrist[2] - prev_wrist[2] | |
| speed = math.sqrt(dx*dx + dy*dy + dz*dz) | |
| speeds.append(speed) | |
| frame_indices.append(f_curr) | |
| if len(speeds) < 3: | |
| return float(impact_candidate) | |
| # Fit parabola to speed curve: speed = a*t^2 + b*t + c | |
| # Find peak (maximum) of parabola | |
| t = np.array(frame_indices, dtype=float) | |
| s = np.array(speeds) | |
| # Polynomial fit (degree 2) | |
| coeffs = np.polyfit(t, s, 2) | |
| a, b, c = coeffs | |
| if a >= 0: # Parabola opens upward → no maximum | |
| return float(impact_candidate) | |
| # Maximum at t = -b/(2a) | |
| peak_frame_fractional = -b / (2*a) | |
| # Impact happens just before peak (typically 0.5-1.0 frames before) | |
| # Use the original logic of peak-1, but with fractional precision | |
| impact_fractional = peak_frame_fractional - 0.7 # Refined offset | |
| # Constrain to reasonable range around original estimate | |
| if abs(impact_fractional - impact_candidate) > 2.0: | |
| impact_fractional = float(impact_candidate) | |
| dbg(f"subframe_refinement - parabola_coeffs=[{a:.6f}, {b:.6f}, {c:.6f}]") | |
| dbg(f"subframe_refinement - peak_frame={peak_frame_fractional:.2f}, impact_refined={impact_fractional:.2f}") | |
| return impact_fractional | |
| except Exception as e: | |
| dbg(f"subframe_refinement - failed: {e}, using integer frame") | |
| return float(impact_candidate) | |
| def _address_frames(swing_phases: Dict[str, List]) -> List[int]: | |
| """Get address/setup frames""" | |
| return swing_phases.get("setup", [])[:10] | |
| def _impact_frame(world_landmarks: Dict[int, List], swing_phases: Dict[str, List]) -> Optional[int]: | |
| """Find impact using peak lead-hand speed - 1 frame with inline refinement""" | |
| downswing_frames = swing_phases.get("downswing", []) | |
| impact_frames = swing_phases.get("impact", []) | |
| analysis_frames = sorted(downswing_frames + impact_frames) | |
| if len(analysis_frames) < 5: | |
| return impact_frames[0] if impact_frames else None | |
| # Calculate hand speeds | |
| hand_speeds = [] | |
| valid_frames = [] | |
| for i in range(1, len(analysis_frames)): | |
| f_curr = analysis_frames[i] | |
| f_prev = analysis_frames[i-1] | |
| if f_curr in world_landmarks and f_prev in world_landmarks: | |
| curr_frame = world_landmarks[f_curr] | |
| prev_frame = world_landmarks[f_prev] | |
| if (curr_frame and len(curr_frame) > 16 and | |
| prev_frame and len(prev_frame) > 16): | |
| # Use lead hand (left wrist for right-handed golfer) | |
| wrist_idx = 15 # Lead wrist for right-handed | |
| curr_wrist = curr_frame[wrist_idx] | |
| prev_wrist = prev_frame[wrist_idx] | |
| if (curr_wrist and len(curr_wrist) >= 3 and | |
| prev_wrist and len(prev_wrist) >= 3): | |
| dx = curr_wrist[0] - prev_wrist[0] | |
| dy = curr_wrist[1] - prev_wrist[1] | |
| dz = curr_wrist[2] - prev_wrist[2] | |
| speed = math.sqrt(dx*dx + dy*dy + dz*dz) | |
| hand_speeds.append(speed) | |
| valid_frames.append(f_curr) | |
| if len(hand_speeds) >= 3: | |
| # Find peak speed frame | |
| max_speed_idx = np.argmax(hand_speeds) | |
| peak_speed_frame = valid_frames[max_speed_idx] | |
| # Initial estimate: peak speed - 1 frame | |
| impact_candidate = peak_speed_frame | |
| if max_speed_idx > 0: | |
| impact_candidate = valid_frames[max_speed_idx - 1] | |
| # Clamp to ±2 frames around original estimate | |
| original = valid_frames[max_speed_idx - 1] if max_speed_idx > 0 else peak_speed_frame | |
| impact_candidate = max(original - 2, min(original + 2, impact_candidate)) | |
| return impact_candidate | |
| return impact_frames[0] if impact_frames else None | |
| # Removed complex 2D fallback function - using simple direct measurement only | |
| # ================== HIP-SHOULDER SEPARATION METRIC ================== | |
| def _angle_deg(p_left: np.ndarray, p_right: np.ndarray) -> float: | |
| """ | |
| Angle of the line Left->Right relative to screen-horizontal, in degrees. | |
| Image coords: x right+, y down+. | |
| """ | |
| dx = float(p_right[0] - p_left[0]) | |
| dy = float(p_right[1] - p_left[1]) | |
| return np.degrees(np.arctan2(dy, dx)) | |
| def _wrap_deg(a: float) -> float: | |
| """Wrap to (-180, 180].""" | |
| a = (a + 180.0) % 360.0 - 180.0 | |
| return a | |
| def _span(p_left: np.ndarray, p_right: np.ndarray) -> float: | |
| """Euclidean span in pixels.""" | |
| return float(np.hypot(p_right[0] - p_left[0], p_right[1] - p_left[1])) | |
| def _trimmed_median(values: List[float], trim: float = 0.2) -> float: | |
| """Robust center estimate.""" | |
| if not values: | |
| return 0.0 | |
| arr = np.sort(np.asarray(values, dtype=float)) | |
| n = len(arr) | |
| k = int(np.floor(trim * n)) | |
| arr = arr[k: n - k] if n - 2*k > 0 else arr | |
| return float(np.median(arr)) | |
| def _circular_mean_deg(values: List[float]) -> float: | |
| """Circular mean for angle values to avoid wrap artifacts.""" | |
| if not values: | |
| return 0.0 | |
| a = np.radians(values) | |
| return float(np.degrees(np.arctan2(np.mean(np.sin(a)), np.mean(np.cos(a))))) | |
| def estimate_global_roll_deg( | |
| lm_seq: np.ndarray, | |
| setup_idxs: List[int], | |
| use_eyes: bool = True | |
| ) -> float: | |
| """ | |
| Estimate camera roll from early setup frames using eyes + shoulders only. | |
| Uses circular mean to avoid wrap artifacts. Excludes hips to prevent bias from pelvis rotation. | |
| Returns a single roll angle in degrees (to subtract from later line angles). | |
| """ | |
| eye_angles, sho_angles = [], [] | |
| for t in setup_idxs: | |
| if t >= lm_seq.shape[0]: | |
| continue | |
| lm = lm_seq[t] | |
| try: | |
| if use_eyes and len(lm) > max(L_EYE, R_EYE): | |
| eye_angles.append(_angle_deg(lm[L_EYE], lm[R_EYE])) | |
| if len(lm) > max(L_SHO, R_SHO): | |
| sho_angles.append(_angle_deg(lm[L_SHO], lm[R_SHO])) | |
| except Exception: | |
| continue | |
| # Combine all angles for circular mean (exclude hips to avoid pelvis bias) | |
| all_angles = [] | |
| if eye_angles: all_angles.extend(eye_angles) | |
| if sho_angles: all_angles.extend(sho_angles) | |
| if not all_angles: | |
| return 0.0 | |
| # Use circular mean to avoid wrap artifacts like 171° instead of -18° | |
| return _circular_mean_deg(all_angles) | |
| def hip_shoulder_separation_deg( | |
| lm_seq: np.ndarray, | |
| idx: int, | |
| setup_idxs: List[int], | |
| handed: str = "R", | |
| min_span_px: float = 20.0, | |
| smooth_half_window: int = 2, # 5-frame median around idx | |
| apply_yaw_correction: bool = True | |
| ) -> Dict[str, float]: | |
| """ | |
| Compute signed hip-shoulder separation at a given frame. | |
| Positive (right-handed) => hips more open than shoulders (hips leading). | |
| For left-handed, sign is flipped so positive still means hips leading. | |
| New features: | |
| - Temporal smoothing: Uses median over ±smooth_half_window frames around idx | |
| - Yaw compensation: Corrects for camera angle by comparing shoulder spans | |
| - Geometry confidence: Flags when foreshortening may affect accuracy | |
| Returns: dict with: | |
| 'hip_shoulder_sep_deg': Corrected separation angle | |
| 'hip_shoulder_sep_abs_deg': Absolute value of separation | |
| 'confidence': 0-1 quality of landmark detection | |
| 'span_ratio': Shoulder span at idx vs address (for foreshortening check) | |
| 'geometry_ok': True if camera appears square to player | |
| """ | |
| T = lm_seq.shape[0] | |
| # Neighborhood for temporal median smoothing | |
| lo = max(0, idx - smooth_half_window) | |
| hi = min(T - 1, idx + smooth_half_window) | |
| window = range(lo, hi + 1) | |
| # Estimate roll from setup | |
| roll_deg = estimate_global_roll_deg(lm_seq, setup_idxs, use_eyes=True) | |
| hip_angles, sho_angles, confs = [], [], [] | |
| for t in window: | |
| if t >= T: | |
| continue | |
| lm = lm_seq[t] | |
| if len(lm) <= max(L_SHO, R_SHO, L_HIP, R_HIP): | |
| continue | |
| Ls, Rs = lm[L_SHO], lm[R_SHO] | |
| Lh, Rh = lm[L_HIP], lm[R_HIP] | |
| # Confidence via spans | |
| span_sho = _span(Ls, Rs) | |
| span_hip = _span(Lh, Rh) | |
| good = float((span_sho >= min_span_px) and (span_hip >= min_span_px)) | |
| confs.append(good) | |
| # Raw angles | |
| a_sho = _angle_deg(Ls, Rs) - roll_deg | |
| a_hip = _angle_deg(Lh, Rh) - roll_deg | |
| # Wrap both post de-roll | |
| a_sho = _wrap_deg(a_sho) | |
| a_hip = _wrap_deg(a_hip) | |
| sho_angles.append(a_sho) | |
| hip_angles.append(a_hip) | |
| if not sho_angles or not hip_angles: | |
| return dict(hip_shoulder_sep_deg=0.0, hip_shoulder_sep_abs_deg=0.0, confidence=0.0) | |
| # Temporal median to reduce jitter | |
| a_sho_med = float(np.median(sho_angles)) | |
| a_hip_med = float(np.median(hip_angles)) | |
| conf = float(np.mean(confs)) # 0..1 proportion of frames with decent span | |
| # Separation: hips minus shoulders | |
| sep = _wrap_deg(a_hip_med - a_sho_med) | |
| # DEBUG PRINT: Add debugging info for the angles and separation | |
| print(f"[DBG] roll={roll_deg:.1f}°, a_hip_med={a_hip_med:.1f}°, a_sho_med={a_sho_med:.1f}°, sep={sep:.1f}°") | |
| # FORESHORTENING CHECK: Compare shoulder span from address to impact | |
| span_addr = None | |
| span_ratio = 1.0 | |
| geom_ok = True | |
| severe_foreshortening = False | |
| if apply_yaw_correction and setup_idxs: | |
| try: | |
| # Calculate median shoulder and hip spans during address | |
| addr_sho_spans, addr_hip_spans = [], [] | |
| for t in setup_idxs: | |
| if t < lm_seq.shape[0]: | |
| lm = lm_seq[t] | |
| if len(lm) > max(L_SHO, R_SHO): | |
| addr_sho_spans.append(_span(lm[L_SHO], lm[R_SHO])) | |
| if len(lm) > max(L_HIP, R_HIP): | |
| addr_hip_spans.append(_span(lm[L_HIP], lm[R_HIP])) | |
| if addr_sho_spans: | |
| span_addr_sho = np.median(addr_sho_spans) | |
| span_addr_hip = np.median(addr_hip_spans) if addr_hip_spans else span_addr_sho | |
| # Current frame spans | |
| lm_current = lm_seq[idx] if idx < lm_seq.shape[0] else None | |
| if lm_current is not None and len(lm_current) > max(L_SHO, R_SHO): | |
| span_imp_sho = _span(lm_current[L_SHO], lm_current[R_SHO]) | |
| span_imp_hip = _span(lm_current[L_HIP], lm_current[R_HIP]) if len(lm_current) > max(L_HIP, R_HIP) else span_imp_sho | |
| # Calculate ratios and use the better (less compressed) one | |
| ratio_sho = span_imp_sho / (span_addr_sho + 1e-6) | |
| ratio_hip = span_imp_hip / (span_addr_hip + 1e-6) | |
| span_ratio = max(ratio_sho, ratio_hip) # Use less-compressed ratio | |
| print(f"[DBG] shoulder_span addr={span_addr_sho:.1f}px, impact={span_imp_sho:.1f}px, ratio_sho={ratio_sho:.2f}") | |
| print(f"[DBG] hip_span addr={span_addr_hip:.1f}px, impact={span_imp_hip:.1f}px, ratio_hip={ratio_hip:.2f}") | |
| print(f"[DBG] using_ratio={span_ratio:.2f} (better of sho/hip)") | |
| # Apply clamping to realistic range (0.7-1.0) to avoid underestimation | |
| ratio_clamped = min(1.0, max(0.7, span_ratio)) | |
| sep_raw = sep | |
| sep = sep / ratio_clamped | |
| print(f"[DBG] sep_raw={sep_raw:.1f}°, sep_corr={sep:.1f}° (ratio_clamped={ratio_clamped:.2f})") | |
| # Geometry confidence flag - flag low-confidence results when span ratio <0.5 | |
| geom_ok = span_ratio >= 0.7 # reasonable geometry | |
| severe_foreshortening = span_ratio < 0.5 # flag low-confidence results | |
| except Exception as e: | |
| print(f"[DBG] Foreshortening calculation failed: {e}") | |
| # Handedness normalization: keep "positive = hips leading" for both | |
| if handed.upper().startswith("L"): | |
| sep *= -1.0 | |
| # Adjust confidence based on foreshortening severity | |
| final_conf = conf | |
| if severe_foreshortening: | |
| final_conf *= 0.2 # Very low confidence for severe foreshortening (span ratio <0.5) | |
| elif not geom_ok: | |
| final_conf *= 0.6 # Reduced confidence for geometry issues (span ratio <0.7) | |
| return dict( | |
| hip_shoulder_sep_deg=float(sep), | |
| hip_shoulder_sep_abs_deg=float(abs(sep)), | |
| confidence=float(final_conf), | |
| span_ratio=float(span_ratio), | |
| geometry_ok=bool(geom_ok), | |
| severe_foreshortening=bool(severe_foreshortening) | |
| ) | |
| def metric_at_address_top_impact( | |
| lm_seq: np.ndarray, | |
| address_idxs: List[int], | |
| top_idx: Optional[int], | |
| impact_idx: int, | |
| handed: str = "R" | |
| ) -> Dict[str, Dict[str, float]]: | |
| """ | |
| Compute the metric at address (median over address_idxs), top (optional), and impact. | |
| address_idxs: e.g., first 8–12 frames while static at setup. | |
| """ | |
| out = {} | |
| # Address: use address window both for setup roll and target idx smoothing | |
| addr_idx = int(np.median(address_idxs)) | |
| out["address"] = hip_shoulder_separation_deg( | |
| lm_seq, addr_idx, setup_idxs=address_idxs, handed=handed | |
| ) | |
| if top_idx is not None: | |
| out["top"] = hip_shoulder_separation_deg( | |
| lm_seq, top_idx, setup_idxs=address_idxs, handed=handed | |
| ) | |
| out["impact"] = hip_shoulder_separation_deg( | |
| lm_seq, impact_idx, setup_idxs=address_idxs, handed=handed | |
| ) | |
| return out | |
| def calculate_hip_shoulder_separation_at_impact( | |
| pose_data: Dict[int, List], | |
| swing_phases: Dict[str, List], | |
| handedness: str = 'right' | |
| ) -> Dict[str, float]: | |
| """ | |
| Calculate hip-shoulder separation at impact using the new metric. | |
| Args: | |
| pose_data: Dictionary mapping frame indices to pose keypoints | |
| swing_phases: Dictionary mapping phase names to lists of frame indices | |
| handedness: 'right' or 'left' handed player | |
| Returns: | |
| Dict with hip_shoulder_sep_deg, hip_shoulder_sep_abs_deg, confidence | |
| """ | |
| # Convert pose_data to numpy array format expected by the metric | |
| if not pose_data: | |
| return {"hip_shoulder_sep_deg": None, "hip_shoulder_sep_abs_deg": None, "confidence": 0.0} | |
| # Get frame indices | |
| frame_indices = sorted(pose_data.keys()) | |
| max_frame_idx = max(frame_indices) | |
| # Create numpy array with shape [T, N, 2] where N >= 33 (Mediapipe format) | |
| lm_seq = np.zeros((max_frame_idx + 1, 33, 2), dtype=float) | |
| # Fill the array with pose data | |
| for frame_idx, landmarks in pose_data.items(): | |
| if landmarks and len(landmarks) >= 25: # Need at least hip landmarks | |
| for i, lm in enumerate(landmarks): | |
| if i < 33 and lm and len(lm) >= 2: | |
| lm_seq[frame_idx, i, 0] = lm[0] # x | |
| lm_seq[frame_idx, i, 1] = lm[1] # y | |
| # Get swing phase indices | |
| setup_frames = swing_phases.get("setup", []) | |
| impact_frames = swing_phases.get("impact", []) | |
| if not setup_frames or not impact_frames: | |
| dbg("[HIP-SHO] Missing setup or impact frames") | |
| return {"hip_shoulder_sep_deg": None, "hip_shoulder_sep_abs_deg": None, "confidence": 0.0} | |
| # Use first 8-10 setup frames for address | |
| address_idxs = setup_frames[:min(10, len(setup_frames))] | |
| impact_idx = impact_frames[0] | |
| # Convert handedness format | |
| handed = "R" if handedness.lower().startswith('r') else "L" | |
| try: | |
| # Calculate the metric at impact (5-frame median for temporal smoothing) | |
| result = hip_shoulder_separation_deg( | |
| lm_seq=lm_seq, | |
| idx=impact_idx, | |
| setup_idxs=address_idxs, | |
| handed=handed, | |
| smooth_half_window=2 # 5-frame median around impact (±2 frames) | |
| ) | |
| # Enhanced debug output with geometry information | |
| geom_warning = "" | |
| if result.get('severe_foreshortening', False): | |
| geom_warning = " (severely underest. - severe foreshortening)" | |
| elif not result.get('geometry_ok', True): | |
| geom_warning = " (underest. likely; yaw/foreshortening)" | |
| dbg(f"[HIP-SHO] Impact separation: {result['hip_shoulder_sep_deg']:.1f}°, confidence: {result['confidence']:.2f}, span_ratio: {result.get('span_ratio', 1.0):.2f}{geom_warning}") | |
| return result | |
| except Exception as e: | |
| dbg(f"[HIP-SHO] Calculation failed: {e}") | |
| return {"hip_shoulder_sep_deg": None, "hip_shoulder_sep_abs_deg": None, "confidence": 0.0} | |
| # ================== END HIP-SHOULDER SEPARATION METRIC ================== | |
| def calculate_hip_sway_at_top(pose_data: Dict[int, List], swing_phases: Dict[str, List], | |
| world_landmarks: Optional[Dict[int, List]] = None) -> Union[float, None]: | |
| # Apply additional temporal smoothing (Savitzky-Golay or EWMA) before measurement | |
| sm = smooth_landmarks(pose_data, alpha=0.25) # Stronger smoothing for hip sway | |
| setup_frames = _pick_stable_setup_frames(sm, swing_phases, max_frames=6) | |
| if not setup_frames: return None | |
| backswing_frames = swing_phases.get("backswing", []) | |
| if not backswing_frames: return None | |
| top_frame = _pick_top_by_wrist_height(sm, backswing_frames) or backswing_frames[-1] | |
| H, ok, H_frame = _build_foot_only_homography(sm, setup_frames) | |
| if not ok or H is None: | |
| # simple pixel fallback with world toes scale if present | |
| return _simple_pixel_sway_fallback(sm, setup_frames, top_frame, swing_phases, world_landmarks) | |
| import cv2 | |
| def warp2(pts): | |
| A = np.array(pts, dtype=np.float32).reshape(1, -1, 2) | |
| return cv2.perspectiveTransform(A, H)[0] | |
| # forward axis = perp to toe line in ground plane | |
| toe_vecs = [] | |
| for f in setup_frames: | |
| lm = sm.get(f) | |
| if not lm or not lm[31] or not lm[32]: continue | |
| g = warp2([[lm[31][0], lm[31][1]], [lm[32][0], lm[32][1]]]) | |
| v = g[1] - g[0] | |
| n = np.linalg.norm(v) | |
| if n > 1e-6: toe_vecs.append(v/n) | |
| if not toe_vecs: return None | |
| toe_hat = np.median(np.stack(toe_vecs), axis=0); toe_hat /= (np.linalg.norm(toe_hat)+1e-9) | |
| fwd_hat = np.array([-toe_hat[1], toe_hat[0]]) | |
| fwd_hat /= (np.linalg.norm(fwd_hat)+1e-9) | |
| # sign disambiguation with addr→impact pelvis motion (optional but stable) | |
| imp = swing_phases.get("impact", []) | |
| cand = imp[0] if imp else backswing_frames[-1] | |
| def pelvis_ctr_g(f): | |
| lm = sm.get(f); | |
| if not lm or not lm[23] or not lm[24]: return None | |
| g = warp2([[lm[23][0], lm[23][1]], [lm[24][0], lm[24][1]]]); | |
| return (g[0]+g[1])/2 | |
| addr_ctrs = [pelvis_ctr_g(f) for f in setup_frames]; addr_ctrs = [c for c in addr_ctrs if c is not None] | |
| imp_ctr = pelvis_ctr_g(cand) | |
| if addr_ctrs and imp_ctr is not None: | |
| addr_ctr = np.median(np.stack(addr_ctrs), axis=0) | |
| if float(np.dot(imp_ctr-addr_ctr, fwd_hat)) < 0: | |
| dbg("[SWAY] Flipped fwd_hat based on addr→impact pelvis motion") | |
| fwd_hat = -fwd_hat | |
| # Establish robust address baseline to stop drift from inflating jitter | |
| setup_ctr = np.median(np.stack(addr_ctrs), axis=0) | |
| # Robust baseline with detrending: use tight window around address | |
| addr_ctrs_array = np.stack(addr_ctrs) | |
| addr_x_positions = addr_ctrs_array[:, 0] # X coordinates only | |
| # Light detrend to remove slow drift | |
| t = np.arange(len(addr_x_positions)) | |
| if len(addr_x_positions) >= 2: | |
| m, b = np.polyfit(t, addr_x_positions, 1) # Linear fit | |
| residuals = addr_x_positions - (m * t + b) | |
| # Robust jitter using MAD (Median Absolute Deviation) | |
| mad_residuals = np.median(np.abs(residuals - np.median(residuals))) | |
| setup_jitter_px = 1.4826 * mad_residuals # MAD to std conversion | |
| addr_baseline_px = b # Robust address baseline | |
| else: | |
| setup_jitter_px = 0.0 | |
| addr_baseline_px = addr_x_positions[0] if len(addr_x_positions) > 0 else setup_ctr[0] | |
| dbg(f"[SWAY] Robust baseline: jitter_px={setup_jitter_px:.2f} (was std-based), addr_baseline={addr_baseline_px:.1f}") | |
| setup_jitter = setup_jitter_px # Use robust jitter | |
| top_ctr = pelvis_ctr_g(top_frame) | |
| if top_ctr is None: return None | |
| # Apply jitter-aware correction - reduce small movements that may be noise | |
| raw_delta = top_ctr - setup_ctr | |
| if np.linalg.norm(raw_delta) < setup_jitter * 1.5: | |
| dbg(f"[SWAY] Movement ({np.linalg.norm(raw_delta):.2f}) < 1.5x setup jitter ({setup_jitter:.2f}), applying correction") | |
| # Partial correction for movements smaller than jitter threshold | |
| correction_factor = max(0.0, (np.linalg.norm(raw_delta) - setup_jitter) / np.linalg.norm(raw_delta)) if np.linalg.norm(raw_delta) > 0 else 0.0 | |
| corrected_delta = raw_delta * correction_factor | |
| top_ctr = setup_ctr + corrected_delta | |
| # Validate vertical drift but DO NOT repick top to minimize it | |
| TOP_DRIFT_GU_WARN = 3.0 # warn if pelvis vertical drift > 3 gu, but DO NOT change top | |
| TOP_DRIFT_GU_REJECT = 6.0 # only reject if absurd | |
| vertical_drift = abs(top_ctr[1] - setup_ctr[1]) # Ground-plane Y difference | |
| if vertical_drift > TOP_DRIFT_GU_REJECT: | |
| dbg(f"[SWAY] vertical drift {vertical_drift:.1f} gu -> unreliable sway") | |
| return None | |
| elif vertical_drift > TOP_DRIFT_GU_WARN: | |
| dbg(f"[SWAY] vertical drift {vertical_drift:.1f} gu (warn)") | |
| # DO NOT repick top based on vertical drift - top must be picked by club kinematics | |
| # Ground-plane calibration: gu_per_inch stabilization with consistency check | |
| gu_per_inch_list = [] | |
| use_pixel_fallback = False | |
| for k in setup_frames: | |
| if world_landmarks and k in world_landmarks: | |
| try: | |
| # 1) World toe distance (meters -> inches) | |
| Lt_w, Rt_w = world_landmarks[k][31], world_landmarks[k][32] | |
| toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3])) | |
| toe_world_in = float(toe_world_m * 39.3701) | |
| # 2) Ground-plane toe distance (same frame, via homography) | |
| lmK = sm.get(k) | |
| if lmK and lmK[31] and lmK[32]: | |
| toe_g = warp2([ | |
| [lmK[31][0], lmK[31][1]], | |
| [lmK[32][0], lmK[32][1]], | |
| ]) # shape (2,2) | |
| toe_g_dist = float(np.linalg.norm(toe_g[0] - toe_g[1])) # ground units | |
| if toe_g_dist > 1e-3 and toe_world_in > 1e-3: | |
| gu_per_in = toe_g_dist / max(toe_world_in, 1e-6) # ground-units per inch | |
| # Homography plausibility check: stance width should be ~8–30 inches | |
| if 8.0 <= toe_world_in <= 30.0 and 0.1 <= gu_per_in <= 10.0: | |
| gu_per_inch_list.append(gu_per_in) | |
| dbg(f"[SWAY] frame {k}: toe_g={toe_g_dist:.2f} gu, toe_world={toe_world_in:.2f} in, gu_per_in={gu_per_in:.3f}") | |
| else: | |
| dbg(f"[SWAY] implausible frame {k}: toe_world_in={toe_world_in:.2f}, gu_per_in={gu_per_in:.3f}") | |
| except Exception as e: | |
| dbg(f"[SWAY] calibration failed for frame {k}: {e}") | |
| # Reject frames with inconsistent calibration (>10% deviation from median) | |
| if len(gu_per_inch_list) > 1: | |
| initial_median = np.median(gu_per_inch_list) | |
| consistent_values = [] | |
| for val in gu_per_inch_list: | |
| deviation_pct = abs(val - initial_median) / initial_median * 100 | |
| if deviation_pct <= 10.0: # Within 10% of median | |
| consistent_values.append(val) | |
| else: | |
| dbg(f"[SWAY] Rejecting inconsistent gu_per_in={val:.3f} (deviation={deviation_pct:.1f}%)") | |
| if consistent_values: | |
| gu_per_inch_list = consistent_values | |
| dbg(f"[SWAY] Kept {len(consistent_values)}/{len(gu_per_inch_list)} consistent calibration values") | |
| if gu_per_inch_list: | |
| gu_per_in = np.median(gu_per_inch_list) | |
| gu_per_in = float(np.clip(gu_per_in, 0.1, 10.0)) # ground-plane appropriate range | |
| use_pixel_fallback = False | |
| dbg(f"[SWAY] using ground calibration: median gu_per_in={gu_per_in:.3f} from {len(gu_per_inch_list)} frames") | |
| else: | |
| dbg("[SWAY] no consistent ground calibration; will try pixel fallback") | |
| use_pixel_fallback = True | |
| gu_per_in = 1.0 | |
| # Option B pixel fallback with consistency check | |
| if use_pixel_fallback and world_landmarks and setup_frames: | |
| px_per_inch_list = [] | |
| for k in setup_frames: | |
| if k in world_landmarks: | |
| try: | |
| Lt_w, Rt_w = world_landmarks[k][31], world_landmarks[k][32] | |
| toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3])) | |
| toe_world_in = float(toe_world_m * 39.3701) | |
| lmK = sm.get(k) | |
| if lmK and lmK[31] and lmK[32]: | |
| # Image pixel distance | |
| toe_px = float(np.linalg.norm(np.array(lmK[31][:2]) - np.array(lmK[32][:2]))) | |
| if toe_px > 1e-3 and toe_world_in > 1e-3: | |
| px_per_in = toe_px / max(toe_world_in, 1e-6) | |
| if 3.0 <= px_per_in <= 60.0: # image pixels appropriate range | |
| px_per_inch_list.append(px_per_in) | |
| dbg(f"[SWAY] FALLBACK frame {k}: toe_px={toe_px:.1f}, toe_world_in={toe_world_in:.2f}, px_per_in={px_per_in:.2f}") | |
| except: | |
| pass | |
| # Apply same consistency check to pixel fallback | |
| if len(px_per_inch_list) > 1: | |
| initial_median = np.median(px_per_inch_list) | |
| consistent_px_values = [] | |
| for val in px_per_inch_list: | |
| deviation_pct = abs(val - initial_median) / initial_median * 100 | |
| if deviation_pct <= 10.0: # Within 10% of median | |
| consistent_px_values.append(val) | |
| else: | |
| dbg(f"[SWAY] Rejecting inconsistent px_per_in={val:.2f} (deviation={deviation_pct:.1f}%)") | |
| px_per_inch_list = consistent_px_values | |
| if px_per_inch_list: | |
| gu_per_in = np.median(px_per_inch_list) # reuse variable name for simplicity | |
| dbg(f"[SWAY] using pixel fallback: median px_per_in={gu_per_in:.2f} from {len(px_per_inch_list)} frames") | |
| else: | |
| gu_per_in = 1.0 | |
| dbg("[SWAY] all calibration failed, using unit scale") | |
| inch_scale = 1.0 / gu_per_in if gu_per_in > 0 else 1.0 | |
| # Measure ∆x in ground units end-to-end for consistency | |
| # Convert pelvis positions to ground units | |
| setup_ctr_gu = np.array([addr_baseline_px, setup_ctr[1]]) # Use robust baseline | |
| top_ctr_gu = top_ctr | |
| # Two-stage denoising: apply rolling median + EWMA smoothing to top position | |
| # (Note: setup already detrended above) | |
| if len(addr_ctrs) >= 3: | |
| # Simple moving median for top position stability (if multiple candidates available) | |
| top_candidates = [] | |
| for f_nearby in range(max(setup_frames[0], top_frame-2), min(setup_frames[-1], top_frame+3)): | |
| if f_nearby in range(len(pose_data)) and f_nearby != top_frame: | |
| nearby_ctr = pelvis_ctr_g(f_nearby) | |
| if nearby_ctr is not None: | |
| top_candidates.append(nearby_ctr) | |
| if top_candidates and len(top_candidates) >= 2: | |
| top_positions = np.stack([top_ctr] + top_candidates) | |
| # Median filter followed by light smoothing | |
| top_x_values = top_positions[:, 0] | |
| top_x_median = np.median(top_x_values) | |
| # EWMA with conservative alpha on the median-filtered result | |
| alpha = 0.2 | |
| top_ctr_gu = np.array([top_x_median, top_ctr[1]]) | |
| dbg(f"[SWAY] Two-stage denoising: raw_top_x={top_ctr[0]:.2f}, median_filtered={top_x_median:.2f}") | |
| # Calculate sway in ground units, then convert to inches | |
| if top_ctr_gu is not None and setup_ctr_gu is not None and fwd_hat is not None: | |
| sway_gu = float(np.dot(top_ctr_gu - setup_ctr_gu, fwd_hat)) | |
| hip_sway_in = sway_gu * inch_scale | |
| else: | |
| dbg("[SWAY] Missing required data for sway calculation") | |
| return None | |
| # Convert jitter to inches using same scale | |
| setup_jitter_in = setup_jitter * inch_scale if setup_jitter is not None else 0.0 | |
| # Lightweight yaw/foreshortening correction using hip span ratio | |
| ratio_hip = 1.0 # Default if no correction available | |
| if world_landmarks and setup_frames: | |
| try: | |
| # Calculate hip span ratio from setup to top for yaw correction | |
| setup_hip_spans = [] | |
| for f in setup_frames[:3]: # Use first few setup frames | |
| if f in world_landmarks: | |
| wf = world_landmarks[f] | |
| if len(wf) > 24 and wf[23] and wf[24]: # L_HIP, R_HIP | |
| span = np.linalg.norm(np.array(wf[24][:3]) - np.array(wf[23][:3])) | |
| if 0.15 <= span <= 0.6: # Reasonable hip span in meters | |
| setup_hip_spans.append(span) | |
| if setup_hip_spans and top_frame in world_landmarks: | |
| wf_top = world_landmarks[top_frame] | |
| if len(wf_top) > 24 and wf_top[23] and wf_top[24]: | |
| top_hip_span = np.linalg.norm(np.array(wf_top[24][:3]) - np.array(wf_top[23][:3])) | |
| setup_hip_span = np.median(setup_hip_spans) | |
| ratio_hip = top_hip_span / setup_hip_span | |
| ratio_hip = np.clip(ratio_hip, 0.80, 1.00) # Conservative clamp | |
| dbg(f"[SWAY] Hip span ratio: {ratio_hip:.3f} (setup={setup_hip_span:.3f}m, top={top_hip_span:.3f}m)") | |
| except Exception as e: | |
| dbg(f"[SWAY] Hip span ratio calculation failed: {e}") | |
| # Apply yaw correction | |
| hip_sway_in_corrected = hip_sway_in / ratio_hip | |
| dbg(f"[SWAY] Yaw correction: raw={hip_sway_in:.2f}\", corrected={hip_sway_in_corrected:.2f}\" (ratio={ratio_hip:.3f})") | |
| # Use corrected value | |
| hip_sway_in = hip_sway_in_corrected | |
| # STANCE WIDTH NORMALIZATION: Calculate stance width for normalized sway | |
| stance_width_in = None | |
| if gu_per_inch_list and world_landmarks: | |
| # Use the same calibration frames to get stance width | |
| stance_widths = [] | |
| for k in setup_frames: | |
| if k in world_landmarks: | |
| try: | |
| Lt_w, Rt_w = world_landmarks[k][31], world_landmarks[k][32] | |
| toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3])) | |
| stance_widths.append(float(toe_world_m * 39.3701)) # Convert to inches | |
| except: | |
| pass | |
| if stance_widths: | |
| stance_width_in = np.median(stance_widths) | |
| # Normalize and log (don't change return type) | |
| normalized_sway = None | |
| if stance_width_in and 14 <= stance_width_in <= 28: # Reasonable stance width range | |
| normalized_sway = hip_sway_in / stance_width_in | |
| # Calculate trust ratio with robust jitter | |
| sway_jitter_ratio = abs(hip_sway_in) / max(setup_jitter_in, 1e-6) | |
| trust_flag = "trusted" if sway_jitter_ratio >= 1.5 else "low-trust" | |
| # Enhanced debug output with all the key metrics | |
| norm_str = f"{normalized_sway:.3f}" if normalized_sway is not None else "NA" | |
| dbg(f"[SWAY] dx={hip_sway_in:.2f}in, norm={norm_str}, jitter={setup_jitter_in:.2f}in, ratio={sway_jitter_ratio:.1f}") | |
| # Safe formatting for sway_gu which might be None in some edge cases | |
| sway_gu_str = f"{sway_gu:.2f}" if sway_gu is not None else "None" | |
| gu_per_in_str = f"{gu_per_in:.3f}" if gu_per_in is not None else "None" | |
| dbg(f"[SWAY] addr→top pelvis Δx={sway_gu_str}gu @ gu_per_in={gu_per_in_str}, trust={trust_flag}") | |
| if stance_width_in: | |
| dbg(f"[SWAY] stance_width={stance_width_in:.1f}\", target_normalized≈0.16 for pros") | |
| # Trust but verify debug printout | |
| hip_sway_str = f"{hip_sway_in:.1f}" if hip_sway_in is not None else "None" | |
| gu_per_in_final_str = f"{gu_per_in:.3f}" if gu_per_in is not None else "None" | |
| dbg(f"[SWAY] final hip_sway_in={hip_sway_str}\" with gu_per_in={gu_per_in_final_str}") | |
| # Return signed, un-clipped value to preserve direction; clip only for UI bands if desired | |
| return float(hip_sway_in) | |
| def _shaft_axis_hat(lm, handedness='right'): | |
| """Shaft axis proxy: line through wrists (trail->lead). Deterministic and stable.""" | |
| if not lm: return None | |
| L_WR, R_WR = 15, 16 | |
| if not lm[L_WR] or not lm[R_WR] or lm[L_WR][2] < 0.3 or lm[R_WR][2] < 0.3: | |
| return None | |
| # Build trail→lead vector based on handedness | |
| if handedness == 'right': | |
| # Right-handed: trail=16 (right wrist), lead=15 (left wrist) | |
| trail_wrist, lead_wrist = R_WR, L_WR | |
| else: | |
| # Left-handed: trail=15 (left wrist), lead=16 (right wrist) | |
| trail_wrist, lead_wrist = L_WR, R_WR | |
| v = np.array([lm[lead_wrist][0] - lm[trail_wrist][0], lm[lead_wrist][1] - lm[trail_wrist][1]], float) # trail->lead | |
| n = np.linalg.norm(v) | |
| return v / n if n > 1e-6 else None | |
| def _hinge2d_shaft(lm, handed='right', roll_deg=0.0): | |
| """Lead-forearm only wrist hinge = angle between lead forearm and shaft vectors. | |
| Softened gating and consistent lead-only measurement.""" | |
| if not lm: return None | |
| # Lead forearm only - consistent landmark selection | |
| EL, WR = (13, 15) if handed=='right' else (14, 16) # lead elbow/wrist | |
| if not lm[EL] or not lm[WR] or lm[EL][2] < 0.4 or lm[WR][2] < 0.4: # Relaxed visibility threshold | |
| return None | |
| # Lead forearm vector from elbow to wrist | |
| forearm_vec = np.array([lm[WR][0] - lm[EL][0], lm[WR][1] - lm[EL][1]], float) | |
| # Shaft vector using both wrists for more stable direction | |
| shaft_vec = _shaft_axis_hat(lm, handed) | |
| if shaft_vec is None: | |
| # Fallback: use wrist-to-wrist vector as shaft proxy | |
| if lm[15] and lm[16] and lm[15][2] >= 0.4 and lm[16][2] >= 0.4: | |
| # Build trail→lead vector based on handedness for consistency | |
| if handed == 'right': | |
| trail_wrist, lead_wrist = 16, 15 | |
| else: | |
| trail_wrist, lead_wrist = 15, 16 | |
| shaft_vec = np.array([lm[lead_wrist][0] - lm[trail_wrist][0], lm[lead_wrist][1] - lm[trail_wrist][1]], float) | |
| else: | |
| return None | |
| # De-roll both vectors using the provided roll correction | |
| cr, sr = math.cos(math.radians(-roll_deg)), math.sin(math.radians(-roll_deg)) | |
| def deroll(v): return np.array([cr*v[0] - sr*v[1], sr*v[0] + cr*v[1]], float) | |
| forearm_vec = deroll(forearm_vec) | |
| shaft_vec = deroll(shaft_vec) | |
| # Normalize vectors | |
| forearm_norm = np.linalg.norm(forearm_vec) | |
| shaft_norm = np.linalg.norm(shaft_vec) | |
| if forearm_norm < 1e-6 or shaft_norm < 1e-6: | |
| return None | |
| # Calculate angle between lead forearm and shaft | |
| cos_angle = float(np.clip(np.dot(forearm_vec, shaft_vec) / (forearm_norm * shaft_norm), -1.0, 1.0)) | |
| angle_between = math.degrees(math.acos(abs(cos_angle))) # Always positive angle | |
| # Report acute angle (angle between forearm and shaft) instead of obtuse | |
| # This puts results in the expected pro range (~50-70° acute) | |
| hinge_acute = angle_between | |
| hinge_obtuse = 180.0 - angle_between # Traditional radial deviation for debugging | |
| # Debug logging: keep both for debugging but standardize to acute | |
| dbg(f"[HINGE] acute={hinge_acute:.1f}°, obtuse={hinge_obtuse:.1f}° (reporting acute)") | |
| return float(hinge_acute) | |
| def _pelvis_ctr_ground_series(sm, frames, warp2): | |
| """Get pelvis centers in ground plane for given frames""" | |
| ctrs = [] | |
| for f in frames: | |
| lm = sm.get(f) | |
| if not lm or not lm[23] or not lm[24]: | |
| ctrs.append(None) | |
| continue | |
| g = warp2([[lm[23][0], lm[23][1]], [lm[24][0], lm[24][1]]]) | |
| ctrs.append(((g[0]+g[1])/2).astype(float)) | |
| return ctrs | |
| def _refine_top_by_lat_vel(sm, backswing_frames, warp2, target_hat_g, init_top): | |
| """Refine top frame using lateral pelvis velocity zero-crossing""" | |
| # compute pelvis centers along backswing and project onto target axis | |
| ctrs = _pelvis_ctr_ground_series(sm, backswing_frames, warp2) | |
| ts = [i for i,c in enumerate(ctrs) if c is not None] | |
| if len(ts) < 5: | |
| return init_top # not enough samples | |
| proj = np.array([float(np.dot(ctrs[i], target_hat_g)) if ctrs[i] is not None else np.nan for i in range(len(ctrs))]) | |
| # smooth (5-pt) | |
| k = 5 | |
| proj_s = np.convolve(np.nan_to_num(proj), np.ones(k)/k, mode='same') | |
| # finite diff | |
| vel = np.gradient(proj_s) | |
| # take frame near init_top where vel≈0 and |vel| is locally minimal (change of direction) | |
| idx0 = backswing_frames.index(init_top) if init_top in backswing_frames else len(backswing_frames)-1 | |
| win = range(max(0, idx0-4), min(len(vel), idx0+5)) | |
| # find zero-cross or min |vel| | |
| cand = min(win, key=lambda i: abs(vel[i])) | |
| return backswing_frames[cand] | |
| def _point_ground(frame_idx, sm, warp2, i): | |
| """Get ground-plane position of landmark i in frame frame_idx""" | |
| lm = sm.get(frame_idx) | |
| if not lm or not lm[i]: return None | |
| return warp2([[lm[i][0], lm[i][1]]])[0].astype(float) | |
| def _pick_p3_lead_arm_parallel(sm, backswing_frames, handed='right', tol_deg=15.0): | |
| """Pick the backswing frame where the lead shoulder→lead wrist vector is most horizontal.""" | |
| L_SHO, WR = (11, 15) if handed=='right' else (12, 16) | |
| best = None | |
| best_abs = 1e9 | |
| for f in backswing_frames: | |
| lm = sm.get(f) | |
| if not lm or not lm[L_SHO] or not lm[WR]: | |
| continue | |
| if lm[L_SHO][2] < 0.5 or lm[WR][2] < 0.5: | |
| continue | |
| dx = lm[WR][0] - lm[L_SHO][0] | |
| dy = lm[WR][1] - lm[L_SHO][1] | |
| if abs(dx) + abs(dy) < 1e-6: | |
| continue | |
| ang = abs(math.degrees(math.atan2(dy, dx))) # 0° is perfectly horizontal | |
| if ang < best_abs: | |
| best_abs, best = ang, f | |
| return best | |
| def _pick_top_by_wrist_height(sm, backswing, handed='right', min_vis=0.7): | |
| """Pick top frame by highest wrist position with improved validation""" | |
| WR = 15 if handed=='right' else 16 | |
| SH = 11 if handed=='right' else 12 # lead shoulder | |
| best, best_y = None, 1e9 | |
| candidates = [] | |
| consecutive_rejects = 0 | |
| max_consecutive_rejects = 10 # Stop after N consecutive rejects to reduce spam | |
| for f in backswing: | |
| lm = sm.get(f) | |
| if not lm or not lm[WR] or not lm[SH] or lm[WR][2] < min_vis or lm[SH][2] < min_vis: | |
| consecutive_rejects += 1 | |
| if consecutive_rejects >= max_consecutive_rejects: | |
| dbg(f"[WRIST] Skipping {max_consecutive_rejects}+ consecutive bad frames, jumping ahead") | |
| break | |
| continue | |
| # Reject frames with shoulder-to-wrist line near vertical (club laid off/occluded) | |
| # Calculate angle of shoulder-to-wrist line from horizontal | |
| dx = lm[WR][0] - lm[SH][0] | |
| dy = lm[WR][1] - lm[SH][1] | |
| if abs(dx) < 1e-6: # Perfectly vertical | |
| consecutive_rejects += 1 | |
| if consecutive_rejects >= max_consecutive_rejects: | |
| dbg(f"[WRIST] Skipping {max_consecutive_rejects}+ consecutive steep frames, jumping ahead") | |
| break | |
| continue | |
| angle_from_horizontal = abs(math.degrees(math.atan2(dy, dx))) | |
| # Unified gating constant - use everywhere that gates by shoulder-wrist angle | |
| SHO_WRIST_STEEP_MAX = 120.0 # was ~105 in some functions | |
| if angle_from_horizontal > SHO_WRIST_STEEP_MAX: | |
| consecutive_rejects += 1 | |
| if consecutive_rejects >= max_consecutive_rejects: | |
| dbg(f"[WRIST] Skipping {max_consecutive_rejects}+ consecutive steep frames, jumping ahead") | |
| break | |
| # Only log first few rejects to reduce spam | |
| if consecutive_rejects <= 3: | |
| dbg(f"[WRIST] Rejecting frame {f}: shoulder-wrist angle {angle_from_horizontal:.1f}° too steep") | |
| continue | |
| # Found a good frame, reset consecutive reject counter | |
| consecutive_rejects = 0 | |
| y = lm[WR][1] # smaller y = higher hand (screen coords) | |
| candidates.append((f, y, angle_from_horizontal)) | |
| if y < best_y: | |
| best_y, best = y, f | |
| # Additional validation: ensure chosen frame is not an outlier | |
| if best is not None and candidates: | |
| # Check if there are other reasonable candidates nearby in height | |
| height_tolerance = 20 # pixels | |
| reasonable_candidates = [c for c in candidates if abs(c[1] - best_y) <= height_tolerance] | |
| if len(reasonable_candidates) > 1: | |
| # Among height-similar candidates, prefer one with better shoulder-wrist geometry | |
| reasonable_candidates.sort(key=lambda x: x[2]) # Sort by angle (smaller is better) | |
| best_candidate = reasonable_candidates[0] | |
| best = best_candidate[0] | |
| dbg(f"[WRIST] Selected frame {best} with angle {best_candidate[2]:.1f}° from {len(reasonable_candidates)} candidates") | |
| return best | |
| def _pick_top_frame_hinge(sm, backswing_frames, handed='right', roll_deg=0.0): | |
| """Pick frame with maximum hinge in backswing, with visibility and geometry validation""" | |
| if not backswing_frames: | |
| return None | |
| SH = 11 if handed=='right' else 12 # lead shoulder | |
| WR = 15 if handed=='right' else 16 # lead wrist | |
| # Restrict to P3→P4 window by limiting to last 70% of backswing | |
| p3_start_idx = max(0, int(0.3 * len(backswing_frames))) | |
| p3_p4_frames = backswing_frames[p3_start_idx:] | |
| top_vals = [] | |
| for f in p3_p4_frames: | |
| lm = sm.get(f) | |
| if not lm or not lm[WR] or not lm[SH] or lm[WR][2] < 0.7 or lm[SH][2] < 0.7: | |
| continue | |
| # Reject frames with shoulder-to-wrist line near vertical (same logic as height-based) | |
| dx = lm[WR][0] - lm[SH][0] | |
| dy = lm[WR][1] - lm[SH][1] | |
| if abs(dx) < 1e-6: # Perfectly vertical | |
| continue | |
| angle_from_horizontal = abs(math.degrees(math.atan2(dy, dx))) | |
| SHO_WRIST_STEEP_MAX = 120.0 # Unified constant | |
| if angle_from_horizontal > SHO_WRIST_STEEP_MAX: # Use unified threshold | |
| continue | |
| h = _hinge2d_shaft(lm, handed, roll_deg) | |
| if h is not None: | |
| # Additional sanity check for hinge values (h is now acute angle) | |
| if 10.0 <= h <= 120.0: # plausible range for acute angles | |
| top_vals.append((f, h)) | |
| if not top_vals: | |
| return None | |
| # Return frame with maximum hinge | |
| top_frame = max(top_vals, key=lambda x: x[1])[0] | |
| return top_frame | |
| def calculate_wrist_hinge_at_top(pose_data: Dict[int, List], swing_phases: Dict[str, List], | |
| handedness: str = 'right') -> Union[Dict[str, float], None]: | |
| """Calculate wrist hinge using shaft-based method with both P3 and top measurements""" | |
| # Define indices locally to avoid hidden globals | |
| L_EYE, R_EYE = 2, 5 | |
| L_HIP, R_HIP = 23, 24 | |
| sm = smooth_landmarks(pose_data) | |
| backswing_frames = swing_phases.get("backswing", []) | |
| if not backswing_frames: | |
| # Fallback: use any available frames as "backswing" | |
| all_frames = sorted(pose_data.keys()) | |
| if all_frames: | |
| # Use middle third of available frames as "backswing" | |
| mid_start = len(all_frames) // 3 | |
| mid_end = 2 * len(all_frames) // 3 | |
| backswing_frames = all_frames[mid_start:mid_end] or all_frames | |
| else: | |
| return None # Truly no data | |
| # Reuse consensus roll from shoulder tilt function | |
| def consensus_roll(lm, H=None, W=None): | |
| cands = [] | |
| for iL, iR in [(L_EYE, R_EYE), (L_HIP, R_HIP)]: | |
| if lm[iL] and lm[iR]: | |
| dx = lm[iR][0] - lm[iL][0] | |
| dy = lm[iR][1] - lm[iL][1] | |
| if abs(dx)+abs(dy) > 1e-6: | |
| t = math.degrees(math.atan2(dy, dx)) | |
| t = ((t + 90.0) % 180.0) - 90.0 | |
| if abs(t) <= 20: cands.append(t) | |
| return float(np.median(cands)) if cands else 0.0 | |
| # Initial roll from setup frames (clipped conservatively) | |
| setup_frames = swing_phases.get("setup", [])[:8] | |
| setup_rolls = [consensus_roll(sm.get(f)) for f in setup_frames if sm.get(f)] | |
| setup_roll_deg = float(np.clip(np.median(setup_rolls) if setup_rolls else 0.0, -10.0, 10.0)) | |
| # More robust top frame selection: try multiple methods | |
| top_frame = _pick_top_by_wrist_height(sm, backswing_frames, handed=handedness) | |
| if not top_frame: | |
| top_frame = _pick_top_frame_hinge(sm, backswing_frames, handed=handedness, roll_deg=setup_roll_deg) | |
| if not top_frame and backswing_frames: | |
| # Last resort: use frame with max backswing index that has good landmarks | |
| for f in reversed(backswing_frames): | |
| lm = sm.get(f) | |
| EL, WR = (13, 15) if handedness=='right' else (14, 16) | |
| if lm and lm[EL] and lm[WR] and lm[EL][2] >= 0.5 and lm[WR][2] >= 0.5: | |
| top_frame = f | |
| dbg(f"[WRIST] Using fallback top frame: {f}") | |
| break | |
| # Ensure we never have None for top_frame | |
| if top_frame is None and backswing_frames: | |
| top_frame = backswing_frames[-1] | |
| # Compute local roll around top frame for better accuracy | |
| roll_deg = setup_roll_deg # Start with setup roll as baseline | |
| if top_frame is not None: | |
| # Get frames around top for local roll calculation | |
| local_window = [f for f in range(max(backswing_frames[0], top_frame-4), | |
| min(backswing_frames[-1], top_frame+5)+1) if f in backswing_frames] | |
| local_rolls = [consensus_roll(sm.get(f)) for f in local_window if sm.get(f)] | |
| if local_rolls: | |
| # Use wider clipping range for top position (±20-25°) | |
| local_roll = float(np.median(local_rolls)) | |
| roll_deg = float(np.clip(local_roll, -25.0, 25.0)) | |
| dbg(f"[WRIST] Local roll at top: {roll_deg:.1f}° (setup: {setup_roll_deg:.1f}°)") | |
| else: | |
| dbg(f"[WRIST] Using setup roll: {roll_deg:.1f}° (no local roll data)") | |
| # FIXED: Calculate hinge with moving window average around top frame for smoothness | |
| hinge_top = None | |
| if top_frame is not None: | |
| # Use ±2 frames around top for moving window average (5-frame window) | |
| win = [f for f in range(max(backswing_frames[0], top_frame-2), | |
| min(backswing_frames[-1], top_frame+2)+1) if f in backswing_frames] | |
| valid_hinges = [] | |
| for f in win: | |
| landmarks = sm.get(f) | |
| if landmarks: | |
| h = _hinge2d_shaft(landmarks, handedness, roll_deg) | |
| if h is not None: | |
| # h here is now acute angle (angle between forearm and shaft) | |
| if 10.0 <= h <= 120.0: # plausible range for acute angles | |
| valid_hinges.append((f, h)) | |
| dbg(f"[WRIST] frame {f}: hinge_acute={h:.1f}°") | |
| if valid_hinges: | |
| # Use moving window average of valid hinge values for smoothness | |
| hinge_values = [h for _, h in valid_hinges] | |
| if len(hinge_values) >= 3: | |
| # Remove outliers and take median for robustness | |
| hinge_array = np.array(hinge_values) | |
| median_val = np.median(hinge_array) | |
| mad = np.median(np.abs(hinge_array - median_val)) | |
| outlier_threshold = median_val + 2.5 * mad # Remove values more than 2.5 MAD from median | |
| filtered_hinges = [h for h in hinge_values if abs(h - median_val) <= 2.5 * mad] | |
| if filtered_hinges: | |
| hinge_top = float(np.mean(filtered_hinges)) # Use mean of filtered values | |
| dbg(f"[WRIST] Smoothed hinge (mean of {len(filtered_hinges)} values): {hinge_top:.1f}°") | |
| else: | |
| hinge_top = float(median_val) | |
| else: | |
| # Not enough data for robust averaging, use median | |
| hinge_top = float(np.median(hinge_values)) | |
| dbg(f"[WRIST] Limited data, using median of {len(hinge_values)} values: {hinge_top:.1f}°") | |
| else: | |
| # Fallback: find best single frame | |
| best = (top_frame, None) | |
| for f in win: | |
| landmarks = sm.get(f) | |
| if landmarks: | |
| h = _hinge2d_shaft(landmarks, handedness, roll_deg) | |
| if h is not None and (best[1] is None or h > best[1]): | |
| best = (f, h) | |
| top_frame, hinge_top = best | |
| # Additional validation to catch landmark selection errors (now acute angles) | |
| if hinge_top is not None and (hinge_top < 5.0 or hinge_top > 120.0): | |
| dbg(f"WARNING: Suspicious acute hinge angle {hinge_top:.1f}° - possible landmark error") | |
| hinge_top = np.clip(hinge_top, 20.0, 90.0) # Conservative clamp for acute angles | |
| dbg(f"top_selection: selected={top_frame}") | |
| # FIXED: P3 frame and hinge with better validation | |
| p3_frame = _pick_p3_lead_arm_parallel(sm, backswing_frames, handedness) | |
| hinge_p3 = None | |
| if p3_frame is not None: | |
| p3_landmarks = sm.get(p3_frame) | |
| if p3_landmarks: | |
| hinge_p3 = _hinge2d_shaft(p3_landmarks, handedness, roll_deg) | |
| # Validate P3 hinge angle is reasonable (now acute angle) | |
| if hinge_p3 is not None and (hinge_p3 < 10.0 or hinge_p3 > 120.0): | |
| dbg(f"WARNING: Suspicious P3 acute hinge angle {hinge_p3:.1f}° - possible landmark error") | |
| # FIXED: Address baseline with improved landmark validation | |
| addr_frames = _pick_stable_setup_frames(sm, swing_phases, max_frames=6) or _address_frames(swing_phases) | |
| addr_vals = [] | |
| for f in addr_frames[:6]: | |
| addr_landmarks = sm.get(f) | |
| if addr_landmarks: | |
| h = _hinge2d_shaft(addr_landmarks, handedness, roll_deg) | |
| # Remove address hinge range filter - accept all valid measurements | |
| if h is not None: | |
| addr_vals.append(h) | |
| hinge_addr = float(np.median(addr_vals)) if addr_vals else None | |
| # If top hinge failed, borrow P3; if that also failed, borrow address | |
| if hinge_top is None: | |
| if hinge_p3 is not None: | |
| hinge_top = hinge_p3 | |
| dbg(f"hinge_top was None, borrowed from P3: {hinge_top}") | |
| elif hinge_addr is not None: | |
| hinge_top = hinge_addr # last resort (won't be graded as "top", but avoids None) | |
| dbg(f"hinge_top was None, borrowed from address: {hinge_top}") | |
| # Remove auto-promotion to avoid convergence bias | |
| # Keep original values to show real differences | |
| # hinge_top and hinge_p3 are now acute angles (forearm-shaft angle) | |
| theta_top = hinge_top # No conversion needed, already acute | |
| theta_p3 = hinge_p3 # No conversion needed, already acute | |
| # Debug prints to verify shaft angles | |
| if theta_top is not None: | |
| dbg(f"[WRIST-CHECK] theta_top = {theta_top:.1f}° (acute angle between forearm & shaft)") | |
| # Also calculate obtuse angles for backward compatibility and debugging | |
| obtuse_top = None if hinge_top is None else (180.0 - hinge_top) | |
| obtuse_p3 = None if hinge_p3 is None else (180.0 - hinge_p3) | |
| # Enhanced debug output with window information | |
| window_size = len([f for f in range(max(backswing_frames[0], top_frame-2), | |
| min(backswing_frames[-1], top_frame+2)+1) if f in backswing_frames]) if top_frame else 0 | |
| hinge_str = f"{hinge_top:.1f}" if hinge_top is not None else "None" | |
| dbg(f"[WRIST] samples={window_size}, mean={hinge_str}°, window=±2") | |
| dbg(f"[WRIST] roll={roll_deg:.1f}°, addr={hinge_addr}, p3={hinge_p3}, top={hinge_top}, " | |
| f"Δp3={None if hinge_addr is None or hinge_p3 is None else hinge_p3-hinge_addr}, " | |
| f"Δtop={None if hinge_addr is None or hinge_top is None else hinge_top-hinge_addr}") | |
| # Trust but verify debug printout | |
| if hinge_top is not None: | |
| dbg(f"[WRIST] final wrist_hinge_top={hinge_top:.1f}° (handed={handedness})") | |
| else: | |
| dbg(f"[WRIST] final wrist_hinge_top=None (handed={handedness})") | |
| result = { | |
| # Report acute angles (what coaches expect: ~50-70° for pros) | |
| "forearm_shaft_angle_top_deg": theta_top, | |
| "forearm_shaft_angle_p3_deg": theta_p3, | |
| "addr_deg": hinge_addr, | |
| "delta_deg_p3": (hinge_p3 - hinge_addr) if (hinge_p3 is not None and hinge_addr is not None) else None, | |
| "delta_deg_top": (hinge_top - hinge_addr) if (hinge_top is not None and hinge_addr is not None) else None, | |
| # NEW: Standardize to acute angles for main reporting | |
| "acute_angle_deg_top": hinge_top, | |
| "acute_angle_deg_p3": hinge_p3, | |
| # Keep obtuse angles for debugging and backward compatibility | |
| "obtuse_angle_deg_top": obtuse_top, | |
| "obtuse_angle_deg_p3": obtuse_p3, | |
| # Flag outside typical pro band for P3 (now using acute angle range) | |
| "definition_suspect": (hinge_p3 is None) or not (40 <= hinge_p3 <= 80), | |
| # Legacy field for backward compatibility - use acute angle | |
| "radial_deviation_deg": hinge_p3 if hinge_p3 is not None else hinge_top | |
| } | |
| return result | |
| def compute_front_facing_metrics(pose_data: Dict[int, List], swing_phases: Dict[str, List], | |
| world_landmarks: Optional[Dict[int, List]] = None, | |
| frames: Optional[List[np.ndarray]] = None, | |
| club: str = "iron", | |
| handedness: str = "right") -> Dict[str, Dict[str, Union[float, str, None]]]: | |
| """Compute the 4 front-facing golf swing metrics with club-aware grading""" | |
| # Calculate individual metrics | |
| torso_sidebend_result = calculate_torso_sidebend_at_impact( | |
| pose_data, swing_phases, frames=frames, handedness=handedness, world_landmarks=world_landmarks | |
| ) | |
| # Extract value for compatibility | |
| shoulder_tilt_impact = torso_sidebend_result.get("value") if torso_sidebend_result else None | |
| hip_sway_top = calculate_hip_sway_at_top(pose_data, swing_phases, world_landmarks) | |
| wrist_hinge_result = calculate_wrist_hinge_at_top(pose_data, swing_phases, handedness=handedness) | |
| hip_shoulder_sep_result = calculate_hip_shoulder_separation_at_impact(pose_data, swing_phases, handedness) | |
| # Extract wrist hinge values (NEW: prioritize forearm-shaft angle measurements) | |
| wrist_top_theta = wrist_hinge_result.get('forearm_shaft_angle_top_deg') if wrist_hinge_result else None | |
| wrist_p3_theta = wrist_hinge_result.get('forearm_shaft_angle_p3_deg') if wrist_hinge_result else None | |
| wrist_p3 = wrist_hinge_result.get('radial_deviation_deg_p3') if wrist_hinge_result else None | |
| wrist_top = wrist_hinge_result.get('radial_deviation_deg_top') if wrist_hinge_result else None | |
| wrist_addr = wrist_hinge_result.get('addr_deg') if wrist_hinge_result else None | |
| delta_p3 = wrist_hinge_result.get('delta_deg_p3') if wrist_hinge_result else None | |
| delta_top = wrist_hinge_result.get('delta_deg_top') if wrist_hinge_result else None | |
| definition_suspect = wrist_hinge_result.get('definition_suspect', False) if wrist_hinge_result else False | |
| # Initialize to avoid undefined locals | |
| wrist_final = None | |
| wrist_kind = None | |
| # Prefer radial deviation (hinge), then P3 radial, then θ as fallback | |
| if wrist_top is not None: | |
| wrist_final = wrist_top | |
| wrist_kind = "radial_deviation_top" | |
| elif wrist_p3 is not None: | |
| wrist_final = wrist_p3 | |
| wrist_kind = "radial_deviation_p3" | |
| elif wrist_top_theta is not None: | |
| wrist_final = wrist_top_theta | |
| wrist_kind = "forearm_shaft_angle_top" | |
| else: | |
| wrist_final = wrist_p3_theta | |
| wrist_kind = "forearm_shaft_angle_p3" | |
| # Add visible version tag and debug info | |
| print(f"[FF] {METRICS_VERSION} running (debug={'on' if VERBOSE else 'off'})") | |
| # Log metrics summary for debugging | |
| dbg(f"[METRIC] torso_side_bend={shoulder_tilt_impact}, sway_top_in={hip_sway_top}, hinge_top={wrist_hinge_result.get('radial_deviation_deg_top') if wrist_hinge_result else None}, hip_sho_sep={hip_shoulder_sep_result.get('hip_shoulder_sep_deg') if hip_shoulder_sep_result else None}") | |
| front_facing_metrics = { | |
| 'torso_side_bend_deg': { | |
| 'value': shoulder_tilt_impact, | |
| 'metric': torso_sidebend_result.get("metric") if torso_sidebend_result else None, | |
| 'quality': torso_sidebend_result.get("quality") if torso_sidebend_result else None, | |
| 'details': torso_sidebend_result.get("details") if torso_sidebend_result else None | |
| }, | |
| 'shoulder_tilt_impact_deg': {'value': shoulder_tilt_impact}, # Deprecated, alias for compatibility | |
| 'hip_sway_top_inches': {'value': hip_sway_top}, | |
| 'hip_shoulder_separation_impact_deg': { | |
| 'value': hip_shoulder_sep_result.get('hip_shoulder_sep_deg') if hip_shoulder_sep_result else None, | |
| 'abs_deg': hip_shoulder_sep_result.get('hip_shoulder_sep_abs_deg') if hip_shoulder_sep_result else None, | |
| 'confidence': hip_shoulder_sep_result.get('confidence') if hip_shoulder_sep_result else 0.0 | |
| }, | |
| 'wrist_hinge_top_deg': { | |
| 'value': wrist_final, | |
| 'value_kind': wrist_kind, | |
| 'forearm_shaft_angle_top_deg': wrist_top_theta, | |
| 'forearm_shaft_angle_p3_deg': wrist_p3_theta, | |
| 'radial_deviation_p3_deg': wrist_p3, | |
| 'radial_deviation_top_deg': wrist_top, | |
| 'addr_deg': wrist_addr, | |
| 'delta_p3_deg': delta_p3, | |
| 'delta_top_deg': delta_top, | |
| 'definition_suspect': definition_suspect | |
| }, | |
| '__version__': METRICS_VERSION, | |
| '_debug': { | |
| 'version': METRICS_VERSION, | |
| 'used_world': bool(world_landmarks), | |
| 'setup_frames': swing_phases.get('setup', [])[:8], | |
| 'impact_frames': swing_phases.get('impact', []), | |
| } | |
| } | |
| return front_facing_metrics |