diff --git "a/app/models/front_facing_metrics.py" "b/app/models/front_facing_metrics.py" --- "a/app/models/front_facing_metrics.py" +++ "b/app/models/front_facing_metrics.py" @@ -10,7 +10,7 @@ import numpy as np import cv2 from typing import Dict, List, Tuple, Optional, Union -# Note: grade_hip_turn function moved to streamlit_app.py to keep all grading logic in one place + # Landmark indices L_HIP, R_HIP = 23, 24 @@ -27,7 +27,7 @@ def dbg(msg): if VERBOSE: print(f"DEBUG: {msg}") # Version tag for tracking code changes -METRICS_VERSION = "front-ff v2025-09-21-fixed-shoulder-tilt" +METRICS_VERSION = "front-ff v2025-09-26-added-hip-shoulder-separation-4-metrics" def _best_foot_pair(lm, want_world=False): """ @@ -99,6 +99,14 @@ def signed_angle2(u, v): det = u[0]*v[1] - u[1]*v[0] return math.degrees(math.atan2(det, dot)) +def _eye_roll_deg(lx, ly, rx, ry): + """Calculate angle of the line from right eye to left eye""" + return math.degrees(math.atan2(ly - ry, lx - rx)) + +def _midpt(a, b): + """Calculate midpoint of two 2D points""" + return ((a[0] + b[0]) * 0.5, (a[1] + b[1]) * 0.5) + def smooth_landmarks(pose_data: Dict[int, List], alpha: float = 0.35) -> Dict[int, List]: """Apply exponential moving average smoothing to landmarks""" if not pose_data: @@ -135,324 +143,362 @@ def smooth_landmarks(pose_data: Dict[int, List], alpha: float = 0.35) -> Dict[in return smoothed_data -def calculate_shoulder_tilt_at_impact(pose_data, swing_phases, world_landmarks=None, - frames=None, image_shape=None, handedness='right'): +def has_world_ok_scale(world_landmarks, frame_idx=None): + """Check if world landmarks have reasonable scale (relaxed ranges for hip/shoulder span)""" + if not world_landmarks: + return False + + frames_to_check = [frame_idx] if frame_idx is not None else list(world_landmarks.keys())[:5] + + for f in frames_to_check: + if f not in world_landmarks: + continue + wf = world_landmarks[f] + if not wf or len(wf) < 25: + continue + + # Check hip span (relaxed range: 0.20-0.70m) + if wf[23] and wf[24]: # L_HIP, R_HIP + hip_span = math.hypot(wf[24][0] - wf[23][0], wf[24][2] - wf[23][2]) + if 0.20 <= hip_span <= 0.70: + dbg(f"[SCALE] Hip span {hip_span:.3f}m is reasonable") + return True + + # Check shoulder span (relaxed range: 0.25-0.70m) + if wf[11] and wf[12]: # L_SHO, R_SHO + sho_span = math.hypot(wf[12][0] - wf[11][0], wf[12][2] - wf[11][2]) + if 0.25 <= sho_span <= 0.70: + dbg(f"[SCALE] Shoulder span {sho_span:.3f}m is reasonable") + return True + + # Debug why scale check failed + if frames_to_check: + f = frames_to_check[0] + if f in world_landmarks: + wf = world_landmarks[f] + if wf and len(wf) >= 25: + if wf[23] and wf[24]: + hip_span = math.hypot(wf[24][0] - wf[23][0], wf[24][2] - wf[23][2]) + dbg(f"[SCALE] Hip span {hip_span:.3f}m out of range [0.20, 0.70]") + if wf[11] and wf[12]: + sho_span = math.hypot(wf[12][0] - wf[11][0], wf[12][2] - wf[11][2]) + dbg(f"[SCALE] Shoulder span {sho_span:.3f}m out of range [0.25, 0.70]") + + return False + +def unit3(v): + """Normalize 3D vector""" + n = math.sqrt(v[0]*v[0] + v[1]*v[1] + v[2]*v[2]) + return np.array(v) / n if n > 1e-6 else np.array([0.0, 0.0, 1.0]) + +def world_vec(world_landmarks, frame_idx, landmark_name): + """Get world vector for landmark""" + landmark_map = {'C7': 0, 'L_HIP': 23, 'R_HIP': 24} # C7 approximated as nose for now + if landmark_name == 'C7': + # Approximate C7 as midpoint between shoulders raised slightly + if (frame_idx in world_landmarks and len(world_landmarks[frame_idx]) > 12 and + world_landmarks[frame_idx][11] and world_landmarks[frame_idx][12]): + l_sho = np.array(world_landmarks[frame_idx][11][:3]) + r_sho = np.array(world_landmarks[frame_idx][12][:3]) + # Raise midpoint slightly to approximate C7 + c7_approx = (l_sho + r_sho) / 2 + np.array([0, 0.08, 0]) # 8cm higher + return c7_approx + + idx = landmark_map.get(landmark_name) + if idx is None or frame_idx not in world_landmarks: + return None + wf = world_landmarks[frame_idx] + if not wf or len(wf) <= idx or not wf[idx]: + return None + return np.array(wf[idx][:3]) + +def any_nan(arr): + """Check if array contains any NaN values""" + return np.any(np.isnan(arr)) if arr is not None else True + +def calibrate_sidebend_sign(world_landmarks, impact_idx, handedness, fwd_hat_3d): """ - Calculate shoulder tilt at impact with FIXED approach addressing the 1-10° issue. - - Key fixes implemented: - 1. Impact frame refinement: searches ±4 frames around detected impact for maximum shoulder tilt - 2. Removed camera-relative fallback (s_xz): always uses frontal-plane projection atan2(v_up, v_lat) - 3. Removed heuristic guards: no more "target axis mismatch" or "flat-2D guard" overrides - 4. Added world scale checks: shoulder span 0.22-0.70m, falls back to 2D homography if bad - 5. Simplified sign logic: positive = trail shoulder lower for right-handed golfers - 6. Comprehensive debug output showing all key measurements and frame selection - - This addresses the core issue where ~1.2cm shoulder drop was correctly yielding ~9° tilt, - but the wrong frame was being used and/or camera-relative fallbacks were diluting the result. + Returns +1 or -1: multiplier to convert 2D dx sign into 'away-from-target = +' + Uses short window around impact to vote with 3D sign, falls back to handed rule. """ - dbg("[TILT] calculate_shoulder_tilt_at_impact called") - L_SHO, R_SHO, L_EYE, R_EYE, L_HIP, R_HIP = 11, 12, 2, 5, 23, 24 + if not has_world_ok_scale(world_landmarks): + return 1 if handedness == 'right' else -1 - def get_HW(): + W = range(max(0, impact_idx-2), min(len(list(world_landmarks.keys())), impact_idx+3)) + votes = 0 + for i in W: + if i not in world_landmarks: + continue + c7 = world_vec(world_landmarks, i, 'C7') + l_hip = world_vec(world_landmarks, i, 'L_HIP') + r_hip = world_vec(world_landmarks, i, 'R_HIP') + if c7 is None or l_hip is None or r_hip is None: + continue + hip = (l_hip + r_hip) * 0.5 + spine = unit3(c7 - hip) + # lateral-right unit on ground plane from forward + lat = unit3(np.cross([0,1,0], fwd_hat_3d)) + s3d = np.sign(np.dot(spine, lat)) # + means leaning to player's right + votes += s3d + + # majority 3D says which side is player-right; convert to 'away from target = +' + # For RH, away-from-target == player-right; for LH, opposite. + prefer_player_right_positive = np.sign(votes) if votes != 0 else 1 + return prefer_player_right_positive if handedness == 'right' else -prefer_player_right_positive + +def calculate_torso_sidebend_at_impact(pose_data, swing_phases, frames=None, image_shape=None, handedness='right', world_landmarks=None): + """ + Front-facing: torso side-bend in degrees at impact. + Defined as angle between (neck midpoint → hip midpoint) and screen-vertical, + computed in derolled screen space. + """ + dbg(f"[SB] handed={handedness}") + + # --- Resolve impact frame (with a tiny +δ tolerance) --- + impact_frames = swing_phases.get("impact", []) + if impact_frames: + impact_idx = int(round(impact_frames[0])) + else: + # Fallback: find impact using peak hand speed if no swing phases provided + impact_idx = _find_impact_frame(pose_data, world_landmarks) + if impact_idx is None: + impact_idx = sorted(pose_data.keys())[-1] if pose_data else None + + # Surgical asserts to guarantee true impact frame + assert impact_idx is not None, "[SB] impact_idx None (missing world_landmarks pass-thru?)" + n = len(pose_data["landmarks"]) if "landmarks" in pose_data else len(pose_data) + assert impact_idx < n-1, f"[SB] Using last frame ({impact_idx}/{n-1}) — not true impact" + dbg(f"[SB] impact_idx={impact_idx} of {n} frames (OK)") + + if impact_idx is None: + dbg("[TORSO] No impact frame found") + return {"metric": "torso_side_bend_deg", "value": None, "quality": "no-data"} + + num_frames = len(pose_data["landmarks"]) if "landmarks" in pose_data else len(pose_data) + # TEMPORAL SMOOTHING: Use ±2-frame window around impact for better timing + candidates = [impact_idx - 2, impact_idx - 1, impact_idx, impact_idx + 1, impact_idx + 2] + candidates = [i for i in candidates if 0 <= i < num_frames and i in pose_data] + + # --- Improved de-roll using eyes + hips (not eyes-only) to reduce drift --- + lo = max(0, impact_idx - 4) + hi = min(num_frames - 1, impact_idx + 5) + combined_rolls = [] + for i in range(lo, hi + 1): + if i not in pose_data: + continue + L_EYE, R_EYE = 2, 5 + L_HIP, R_HIP = 23, 24 + lm = pose_data[i] + if not lm or len(lm) <= max(R_EYE, R_HIP): + continue + + # Collect both eye and hip roll angles + frame_rolls = [] + + # Eye roll + le, re = lm[L_EYE], lm[R_EYE] + if le is not None and re is not None: + (lx, ly), (rx, ry) = le[:2], re[:2] + frame_rolls.append(_eye_roll_deg(lx, ly, rx, ry)) + + # Hip roll + lh, rh = lm[L_HIP], lm[R_HIP] + if lh is not None and rh is not None: + (lx, ly), (rx, ry) = lh[:2], rh[:2] + frame_rolls.append(_eye_roll_deg(lx, ly, rx, ry)) + + # Use median of available rolls for this frame + if frame_rolls: + combined_rolls.append(np.median(frame_rolls)) + + roll_deg = 0.0 + if combined_rolls: + arr = np.array(combined_rolls, dtype=float) + lo_q, hi_q = np.percentile(arr, [15, 85]) + arr = arr[(arr >= lo_q) & (arr <= hi_q)] + roll_deg = float(np.median(np.clip(arr, -25.0, 25.0))) + dbg(f"[TORSO] eyes+hips roll (trimmed median) = {roll_deg:.1f}°") + + # --- Choose best candidate (prefer middle of temporal window) --- + valid_candidates = [] + L_SHO, R_SHO, L_HIP, R_HIP = 11, 12, 23, 24 + + for i in candidates: + lm = pose_data[i] + if not lm or len(lm) <= max(L_SHO, R_SHO, L_HIP, R_HIP): + continue + ls = lm[L_SHO] + rs = lm[R_SHO] + lh = lm[L_HIP] + rh = lm[R_HIP] + if None not in (ls, rs, lh, rh): + valid_candidates.append(i) + + if not valid_candidates: + dbg("[TORSO] No valid landmarks around impact") + return {"metric": "torso_side_bend_deg", "value": None, "quality": "no-data"} + + # TEMPORAL MEDIAN: Calculate side-bend from all valid frames for stability + def _get_HW(): if frames: H, W = frames[0].shape[:2]; return float(H), float(W) if image_shape and len(image_shape) >= 2: H, W = image_shape[:2]; return float(H), float(W) return None, None + H, W = _get_HW() + cx, cy = (W / 2.0 if W else 0.0), (H / 2.0 if H else 0.0) + rad = math.radians(-roll_deg) + cr, sr = math.cos(rad), math.sin(rad) + + def _deroll(pt): + x, y = pt[:2] # Handle both [x,y] and [x,y,vis] formats + x0, y0 = x - cx, y - cy + xr = cr * x0 - sr * y0 + cx + yr = sr * x0 + cr * y0 + cy + return (xr, yr) + + # Calculate side-bend for all valid frames and take median + L_SHO, R_SHO, L_HIP, R_HIP = 11, 12, 23, 24 + sidebend_values = [] + + for frame_idx in valid_candidates: + lm = pose_data[frame_idx] + ls = _deroll(lm[L_SHO]) + rs = _deroll(lm[R_SHO]) + lh = _deroll(lm[L_HIP]) + rh = _deroll(lm[R_HIP]) + + neck = _midpt(ls, rs) + midhip = _midpt(lh, rh) + dx = neck[0] - midhip[0] + dy = neck[1] - midhip[1] # y grows downward + + # 2D frontal-plane magnitude only + dy = abs(dy) + 1e-6 # Ensure positive denominator + sidebend_abs_frame = math.degrees(math.atan2(abs(dx), dy)) + sidebend_values.append((dx, sidebend_abs_frame)) # Store both dx and magnitude + + # Use median magnitude and median dx sign + if sidebend_values: + dx_values = [sv[0] for sv in sidebend_values] + abs_values = [sv[1] for sv in sidebend_values] + dx = np.median(dx_values) # Median dx for sign + sidebend_abs = np.median(abs_values) # Median magnitude + chosen = valid_candidates[len(valid_candidates)//2] # For debugging reference + else: + return {"metric": "torso_side_bend_deg", "value": None, "quality": "no-data"} + + # Get forward direction for sign calibration + fwd_hat_3d = None + if world_landmarks is not None: + setup_frames = swing_phases.get("setup", [])[:5] or sorted(world_landmarks.keys())[:5] + for f in setup_frames: + if f in world_landmarks: + toe_vec_3d = _toe_line_unit_world(world_landmarks[f]) + if toe_vec_3d is not None: + # Forward perpendicular to toe line in XZ plane + fwd_hat_3d = np.array([-toe_vec_3d[1], 0, toe_vec_3d[0]]) + break + + # Standardized sign convention: positive toward target for right-handed + sign_2d_raw = 1.0 if dx > 0 else -1.0 + cal = calibrate_sidebend_sign(world_landmarks, impact_idx, handedness, fwd_hat_3d) + + # Apply standardized sign convention: positive = away from target + # For right-handed: away from target = toward player's right + # For left-handed: away from target = toward player's left + final_sign = sign_2d_raw * cal + sidebend_deg = final_sign * sidebend_abs + + # Guardrails: clip extreme values but warn + if abs(sidebend_deg) > 30: + dbg(f"[SB] rejecting |sb|={abs(sidebend_deg):.1f}° > 30°, check roll/coords") + sidebend_deg = math.copysign(30.0, sidebend_deg) + + # Debug output matching expected format + dbg(f"[SB] 2D mag={sidebend_abs:.1f}°, cal={cal:+.0f}, final={sidebend_deg:.1f}°") + dbg(f"[TORSO] side-bend = {sidebend_deg:.1f}° (dx={dx:.1f}, dy={abs(dy):.1f}, abs={sidebend_abs:.1f}°)") + + # Optional: shoulder drop % of head height (kept because it's useful) + head_h = pose_data.get("head_h", {}).get(chosen) + drop_pct = None + if head_h and head_h > 0: + # lead vs trail by handedness; y increases downward in image coords + lead_y = ls[1] if handedness == 'right' else rs[1] + trail_y = rs[1] if handedness == 'right' else ls[1] + dy_sho = (lead_y - trail_y) + drop_pct = 100.0 * (dy_sho / head_h) + dbg(f"[TORSO] lead-shoulder drop = {drop_pct:.1f}% head") + + # Trust but verify debug printout + dbg(f"[SB] final torso_sidebend={sidebend_deg:.1f}° at frame={chosen}") + + return { + "metric": "torso_side_bend_deg", + "value": sidebend_deg, + "quality": "ok", + "details": { + "frame": chosen, + "roll_used_deg": roll_deg, + "shoulder_drop_pct_head": drop_pct + } + } - def scale_xy(dx, dy, H, W): - if H is None or W is None: return dx, dy - if max(abs(dx), abs(dy)) <= 2.0: # normalized → px - return dx * W, dy * H - return dx, dy - - def deroll(dx, dy, roll_deg): - cr, sr = math.cos(math.radians(roll_deg)), math.sin(math.radians(roll_deg)) - return (cr*dx + sr*dy, -sr*dx + cr*dy) # rotate by -roll - def consensus_roll(lm, H, W): - cands = [] - for iL, iR in [(L_EYE, R_EYE), (L_HIP, R_HIP)]: - if lm[iL] and lm[iR]: - dx, dy = scale_xy(lm[iR][0]-lm[iL][0], lm[iR][1]-lm[iL][1], H, W) - if abs(dx)+abs(dy) > 1e-6: - t = math.degrees(math.atan2(dy, dx)) - t = ((t + 90.0) % 180.0) - 90.0 - if abs(t) <= 20: cands.append(t) - return float(np.median(cands)) if cands else 0.0 +def calculate_shoulder_tilt_at_impact(*args, **kwargs): + """ + DEPRECATED: Shoulder tilt removed due to occlusion instability. + Returns torso side-bend instead for backward compatibility. + """ + dbg("[DEPRECATION] Shoulder tilt removed. Returning torso side-bend instead.") + res = calculate_torso_sidebend_at_impact(*args, **kwargs) + # keep old key for downstream consumers if needed: + return res["value"] # Return just the numeric value for compatibility - # --- NEW: helper functions for the fixed calculation --- - def unit3(v): - """Normalize 3D vector""" - norm = np.linalg.norm(v) - return v / norm if norm > 1e-6 else v - def _refine_impact_frame_for_max_tilt(initial_frame, world_landmarks, swing_phases): - """Search ±4 frames around impact for frame with maximum shoulder tilt""" - if not world_landmarks or initial_frame not in world_landmarks: - return initial_frame - - # Get target direction for consistent basis - setup_frames = swing_phases.get("setup", [])[:10] or sorted(world_landmarks.keys())[:8] - toes = [] - for f in setup_frames: - w = world_landmarks.get(f) - if not w: continue - toe_unit = _toe_line_unit_world(w) - if toe_unit: - vx, vz = toe_unit - fwd = np.array([-vz, 0.0, vx], float) - toes.append(fwd) - - if not toes: - return initial_frame - - t_hat = unit3(np.median(np.vstack(toes), axis=0)) - up = np.array([0.0, 1.0, 0.0], float) - lat = unit3(np.cross(up, t_hat)) +def _find_impact_frame(pose_data, world_landmarks=None): + """Find impact frame using peak hand speed if not provided in swing phases""" + if not world_landmarks: + return None + + frames = sorted(world_landmarks.keys()) + if len(frames) < 3: + return None + + speeds = [] + for i in range(1, len(frames)): + f_curr = frames[i] + f_prev = frames[i-1] - # Search ±4 frames around initial impact - best_frame = initial_frame - best_tilt_mag = 0.0 + curr_frame = world_landmarks.get(f_curr) + prev_frame = world_landmarks.get(f_prev) - for offset in range(-4, 5): - test_frame = initial_frame + offset - if test_frame not in world_landmarks: - continue + if (curr_frame and len(curr_frame) > 16 and + prev_frame and len(prev_frame) > 16 and + curr_frame[16] and prev_frame[16]): # Right wrist - wf = world_landmarks[test_frame] - if not wf or not wf[L_SHO] or not wf[R_SHO]: - continue + curr_wrist = curr_frame[16] + prev_wrist = prev_frame[16] - # Calculate shoulder vector and projections - s = np.array([wf[R_SHO][0]-wf[L_SHO][0], - wf[R_SHO][1]-wf[L_SHO][1], - wf[R_SHO][2]-wf[L_SHO][2]], float) - v_up = float(np.dot(s, up)) - v_lat = float(np.dot(s, lat)) + # Calculate 3D speed + dx = curr_wrist[0] - prev_wrist[0] + dy = curr_wrist[1] - prev_wrist[1] + dz = curr_wrist[2] - prev_wrist[2] + speed = math.sqrt(dx*dx + dy*dy + dz*dz) - # Find frame with maximum tilt magnitude - tilt_mag = abs(math.atan2(abs(v_up), max(abs(v_lat), 1e-6))) - if tilt_mag > best_tilt_mag: - best_tilt_mag = tilt_mag - best_frame = test_frame - - if best_frame != initial_frame: - dbg(f"[TILT] Impact frame refined from {initial_frame} to {best_frame} (tilt mag: {math.degrees(best_tilt_mag):.1f}°)") - - return best_frame - - def _shoulder_spans_ok(wf): - """Check if world landmark scale is reasonable""" - try: - Ls, Rs = wf[L_SHO], wf[R_SHO] - Lh, Rh = wf[L_HIP], wf[R_HIP] - shp = math.hypot(Rs[0]-Ls[0], Rs[2]-Ls[2]) # XZ shoulder span (m) - php = math.hypot(Rh[0]-Lh[0], Rh[2]-Lh[2]) # XZ pelvis span (m) - return shp, php - except Exception: - return None, None - - # --- helper functions for surgical fix --- - def _ortho_basis(lat_hat_2d): - lat = np.array(lat_hat_2d, float) - lat /= (np.linalg.norm(lat) + 1e-9) - up = np.array([0.0, -1.0], float) - # Gram–Schmidt: make up ⟂ lat - up = up - lat * float(np.dot(up, lat)) - up /= (np.linalg.norm(up) + 1e-9) - return lat, up - - H, W = get_HW() - impact = swing_phases.get("impact", []) - f_imp = (impact[0] if impact else (sorted(pose_data.keys())[-1] if pose_data else None)) - if f_imp is None: return None - - lm_imp = pose_data.get(f_imp) - if not lm_imp or not lm_imp[L_SHO] or not lm_imp[R_SHO]: - return None - - # --- 3D CORONAL-PLANE TILT (preferred if world_landmarks exist) --- - if world_landmarks and f_imp in world_landmarks: - wf = world_landmarks[f_imp] - if wf and wf[L_SHO] and wf[R_SHO]: - # Build orthonormal basis from target (XZ) and the global up - setup_frames = swing_phases.get("setup", [])[:10] or sorted(world_landmarks.keys())[:8] - toes = [] - for f in setup_frames: - w = world_landmarks.get(f) - if not w: continue - toe_unit = _toe_line_unit_world(w) - if toe_unit: - vx, vz = toe_unit # toe line (R-L) unit vector - # forward = perp to toe line in XZ - fwd = np.array([-vz, 0.0, vx], float) - toes.append(fwd) - - dbg(f"[TILT] 3D setup: found {len(toes)} valid toe vectors from {len(setup_frames)} frames") - if toes: - # Build orthonormal basis once (trust it unless clearly degenerate) - t_hat = unit3(np.median(np.vstack(toes), axis=0)) # target direction on ground - up = np.array([0.0, 1.0, 0.0], float) # gravity/up - lat = unit3(np.cross(up, t_hat)) # lateral (across chest) - # Re-orthogonalize to be safe: - t_hat = unit3(np.cross(lat, up)) - - # FIXED: Refine impact frame within ±4 around current to maximize |atan2(v_up, v_lat)| - f_imp_refined = _refine_impact_frame_for_max_tilt(f_imp, world_landmarks, swing_phases) - wf_final = world_landmarks[f_imp_refined] - - # Shoulder vector at refined frame - s = np.array([wf_final[R_SHO][0]-wf_final[L_SHO][0], - wf_final[R_SHO][1]-wf_final[L_SHO][1], - wf_final[R_SHO][2]-wf_final[L_SHO][2]], float) - - # ALWAYS measure in golfer's frontal plane: - v_up = float(np.dot(s, up)) - v_lat = float(np.dot(s, lat)) - - # Sanity checks to catch the "stuck at ~1–10°" cases - shoulder_span_3d = np.linalg.norm(s) - shoulder_span_m, pelvis_span_m = _shoulder_spans_ok(wf_final) - - # Comprehensive debug output - dbg(f"[TILT-3D] Final frame used: {f_imp_refined} (original: {f_imp})") - dbg(f"[TILT-3D] Shoulder vector s = ({s[0]:.3f}, {s[1]:.3f}, {s[2]:.3f}), |s| = {shoulder_span_3d:.3f} m") - dbg(f"[TILT-3D] Vertical difference: s_y = {s[1]:.3f} m ({s[1]*100:.1f} cm)") - dbg(f"[TILT-3D] Lateral component after basis projection: v_lat = {v_lat:.3f} m") - dbg(f"[TILT-3D] v_up = {v_up:.3f}, v_lat = {v_lat:.3f}") - dbg(f"[TILT-3D] Shoulder span (world): {shoulder_span_m:.3f} m" if shoulder_span_m else "[TILT-3D] Shoulder span: None") - dbg(f"[TILT-3D] Pelvis span (world): {pelvis_span_m:.3f} m" if pelvis_span_m else "[TILT-3D] Pelvis span: None") - - # Check for insufficient vertical separation - if abs(v_up) < 0.02: # Less than 2cm of shoulder drop - dbg(f"[TILT-3D] WARNING: Insufficient vertical separation (|v_up| = {abs(v_up)*100:.1f} cm < 2 cm)") - dbg(f"[TILT-3D] Check frame selection/camera alignment. Tilt will be < ~16°") - - # Check for unreasonable world scale - if shoulder_span_m is not None and (shoulder_span_m < 0.22 or shoulder_span_m > 0.70): - dbg(f"[TILT-3D] WARNING: Unreasonable shoulder span ({shoulder_span_m:.3f} m). Distrusting world landmarks.") - # Fall through to 2D homography method below - else: - # World landmarks seem reasonable - calculate frontal plane tilt - tilt_deg = math.degrees(math.atan2(abs(v_up), max(1e-6, abs(v_lat)))) - - # Simple sign rule: positive = trail (right) shoulder lower for right-handed golfers - if handedness == 'right': - tilt_deg = -tilt_deg if v_up < 0 else tilt_deg - else: - tilt_deg = tilt_deg if v_up < 0 else -tilt_deg - - dbg(f"[TILT-3D] atan2(|v_up|, |v_lat|) = atan2({abs(v_up):.3f}, {abs(v_lat):.3f}) = {math.degrees(math.atan2(abs(v_up), max(1e-6, abs(v_lat)))):.1f}°") - dbg(f"[TILT-3D] Final tilt: {tilt_deg:.1f}°") - - return float(np.clip(tilt_deg, -80.0, 80.0)) - - # --- 2D HOMOGRAPHY FALLBACK (when world landmarks fail) --- - dbg("[TILT] World landmarks failed/unavailable. Using 2D homography method.") - - # Get setup frames for homography - setup_frames = swing_phases.get("setup", [])[:8] - if not setup_frames: - setup_frames = sorted(pose_data.keys())[:8] + speeds.append((f_curr, speed)) - # Build homography from foot landmarks to ground plane - H, valid_H, H_frame = _build_foot_only_homography(smooth_landmarks(pose_data), setup_frames) - if not valid_H: - dbg("[TILT] Homography failed - insufficient foot landmarks. Returning None.") - return None - - import cv2 - def warp_pts(pts_img): - """Project image points to rectified ground plane""" - pts_array = np.array(pts_img, dtype=np.float32).reshape(1, -1, 2) - ground_pts = cv2.perspectiveTransform(pts_array, H)[0] - return ground_pts + if speeds: + # Find peak speed frame + peak_frame = max(speeds, key=lambda x: x[1])[0] + # Impact is typically 1 frame before peak speed + impact_frame = peak_frame - 1 + return impact_frame if impact_frame in frames else peak_frame - # Get shoulder points in the rectified ground plane - try: - shoulder_img = [[lm_imp[L_SHO][0], lm_imp[L_SHO][1]], - [lm_imp[R_SHO][0], lm_imp[R_SHO][1]]] - shoulder_ground = warp_pts(shoulder_img) - - # In rectified plane, horizontal = level ground, so vertical component is tilt - s_ground = shoulder_ground[1] - shoulder_ground[0] # Right - Left - v_lat_2d = s_ground[0] # lateral component (should be dominant) - v_up_2d = s_ground[1] # vertical component (tilt we want) - - # Calculate tilt in rectified plane - tilt_deg = math.degrees(math.atan2(abs(v_up_2d), max(1e-6, abs(v_lat_2d)))) - - # Apply handedness-aware sign rule - if handedness == 'right': - tilt_deg = tilt_deg if v_up_2d > 0 else -tilt_deg # positive = trail shoulder lower - else: - tilt_deg = -tilt_deg if v_up_2d > 0 else tilt_deg # flip for left-handed - - dbg(f"[TILT-2D] v_up_2d = {v_up_2d:.3f}, v_lat_2d = {v_lat_2d:.3f}") - dbg(f"[TILT-2D] atan2(|v_up|, |v_lat|) = {math.degrees(math.atan2(abs(v_up_2d), max(1e-6, abs(v_lat_2d)))):.1f}°") - dbg(f"[TILT-2D] Final 2D tilt: {tilt_deg:.1f}°") - - return float(np.clip(tilt_deg, -80.0, 80.0)) - - except Exception as e: - dbg(f"[TILT] 2D homography calculation failed: {e}") - return None - -def pelvis_hat(world_frame): - """Get unit pelvis vector (R_hip - L_hip) in XZ plane""" - try: - L = world_frame[L_HIP] # Left hip (23) - R = world_frame[R_HIP] # Right hip (24) - return unit2((R[0]-L[0], R[2]-L[2])) - except Exception: - return None - -def _pelvis_rel_to_target(world_frame, target_hat): - """Get pelvis orientation relative to target axis""" - ph = pelvis_hat(world_frame) - return None if ph is None else wrap180(signed_angle2(ph, target_hat)) + return None -def _address_pelvis_rel(world_landmarks, setup_frames, target_hat): - """Calculate median pelvis orientation relative to target at address""" - vals = [] - for f in setup_frames[:8]: - if f in world_landmarks: - rel = _pelvis_rel_to_target(world_landmarks[f], target_hat) - if rel is not None: - vals.append(rel) - return float(np.median(vals)) if vals else None -def grade_hip_turn(delta_deg: float, club: str = "iron") -> dict: - """ - Grade hip turn with club-aware bands and actionable tips - - NOTE: This function has been moved to streamlit_app.py to consolidate all grading logic. - This copy remains here temporarily to avoid circular imports. - TODO: Remove this function once a better import structure is established. - """ - c = club.lower() - bands = { - "driver": {"excellent": (38,48), "good_low": (32,38), "good_high": (48,52)}, - "iron": {"excellent": (32,40), "good_low": (28,32), "good_high": (40,44)}, - "wedge": {"excellent": (25,35), "good_low": (22,25), "good_high": (35,38)}, - } - b = bands["driver" if "driver" in c else "wedge" if "wedge" in c else "iron"] - x = delta_deg - - if b["excellent"][0] <= x <= b["excellent"][1]: - label, color, tip = "🟢 Excellent", "green", "Great rotation with control." - elif b["good_low"][0] <= x < b["good_low"][1]: - label, color, tip = "🟠 Good (slightly low)", "orange", "Open a touch more through impact." - elif b["good_high"][0] <= x <= b["good_high"][1]: - label, color, tip = "🟠 Good (slightly high)", "orange", "Post on lead leg; avoid over-spinning hips." - else: - label, color, tip = "🔴 Needs work", "red", ( - "Too little: shift then open. Too much: trail heel down longer; post into lead glute." - ) - return {"label": label, "color": color, "tip": tip} def _target_hat_from_toes_world(world_landmarks, setup_frames, handedness='right'): @@ -621,27 +667,97 @@ def _simple_pixel_sway_fallback(sm, setup_frames, top_frame, swing_phases=None, delta_px = top_ctr - setup_ctr sway_px = float(np.dot(delta_px, target_hat_px)) - # Improved px→inch scale using world landmarks if available - px_per_inch = 10.0 # default fallback - if world_landmarks is not None and f0 in world_landmarks: - try: - wf = world_landmarks[f0] - L_FOOT_INDEX, R_FOOT_INDEX = 31, 32 # foot_index points (toe tips) - Lt, Rt = wf[L_FOOT_INDEX], wf[R_FOOT_INDEX] - if Lt and Rt: - # 3D toe spacing in meters → inches - d_m = math.hypot(Rt[0]-Lt[0], Rt[2]-Lt[2]) - d_in_true = d_m * 39.3701 - # Pixel toe spacing - d_px = float(np.linalg.norm(toe_v)) - if d_px > 1e-3 and d_in_true > 1e-3: - px_per_inch = d_px / d_in_true - dbg(f"pixel_fallback: calibrated px_per_inch={px_per_inch:.2f} from 3D toes") - except Exception: - pass + # Improved px→inch scale using world landmarks with consistency check + px_per_inch_candidates = [] + default_px_per_inch = 10.0 + + if world_landmarks is not None: + # Try to get px_per_inch from multiple setup frames + for f in setup_ok: + if f in world_landmarks: + try: + wf = world_landmarks[f] + lm = get_lm(f) + L_FOOT_INDEX, R_FOOT_INDEX = 31, 32 # foot_index points (toe tips) + Lt, Rt = wf[L_FOOT_INDEX], wf[R_FOOT_INDEX] + if Lt and Rt and lm and lm[31] and lm[32]: + # 3D toe spacing in meters → inches + d_m = math.hypot(Rt[0]-Lt[0], Rt[2]-Lt[2]) + d_in_true = d_m * 39.3701 + # Pixel toe spacing for this frame + toe_v_frame = np.array([lm[32][0] - lm[31][0], lm[32][1] - lm[31][1]], dtype=float) + d_px = float(np.linalg.norm(toe_v_frame)) + if d_px > 1e-3 and d_in_true > 1e-3: + candidate = d_px / d_in_true + # Sanity check: reasonable range for px_per_inch (5-25 depending on resolution/crop) + if 5.0 <= candidate <= 25.0: + px_per_inch_candidates.append(candidate) + dbg(f"pixel_fallback: frame {f} px_per_inch={candidate:.2f}") + except Exception: + continue + + # Apply consistency check: reject frames with >10% deviation from median + if len(px_per_inch_candidates) > 1: + initial_median = np.median(px_per_inch_candidates) + consistent_candidates = [] + for val in px_per_inch_candidates: + deviation_pct = abs(val - initial_median) / initial_median * 100 + if deviation_pct <= 10.0: # Within 10% of median + consistent_candidates.append(val) + else: + dbg(f"pixel_fallback: rejecting inconsistent px_per_inch={val:.2f} (deviation={deviation_pct:.1f}%)") + px_per_inch_candidates = consistent_candidates + + # Use median of candidates if available, otherwise use default + if px_per_inch_candidates: + px_per_inch = float(np.median(px_per_inch_candidates)) + dbg(f"pixel_fallback: median px_per_inch={px_per_inch:.2f} from {len(px_per_inch_candidates)} consistent frames") + else: + px_per_inch = default_px_per_inch + dbg(f"pixel_fallback: using default px_per_inch={px_per_inch:.2f}") sway_inches = sway_px / px_per_inch - dbg(f"pixel_fallback: sway_px={sway_px:.1f}, px_per_inch={px_per_inch:.2f}, sway_in={sway_inches:.2f}") + + # Apply same robust baseline approach to pixel fallback + if len(setup_ctrs) >= 2: + setup_x_px = [c[0] for c in setup_ctrs] + t = np.arange(len(setup_x_px)) + m, b = np.polyfit(t, setup_x_px, 1) if len(setup_x_px) >= 2 else (0, setup_x_px[0]) + residuals = np.array(setup_x_px) - (m * t + b) + mad_residuals = np.median(np.abs(residuals - np.median(residuals))) + jitter_px = 1.4826 * mad_residuals + jitter_in = jitter_px / px_per_inch + else: + jitter_in = 0.1 # Minimal fallback + + # Calculate trust ratio + sway_jitter_ratio = abs(sway_inches) / max(jitter_in, 1e-6) + trust_flag = "trusted" if sway_jitter_ratio >= 1.5 else "low-trust" + + # Normalize by stance width if using pixel fallback and world landmarks available + normalized_sway = None + stance_width_in = None + if world_landmarks: + stance_widths = [] + for f in setup_ok: + if f in world_landmarks: + try: + Lt_w, Rt_w = world_landmarks[f][31], world_landmarks[f][32] + toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3])) + stance_widths.append(float(toe_world_m * 39.3701)) # Convert to inches + except: + pass + + if stance_widths: + stance_width_in = np.median(stance_widths) + if 14 <= stance_width_in <= 28: + normalized_sway = sway_inches / stance_width_in + + # Enhanced pixel fallback logging matching main function + norm_str = f"{normalized_sway:.3f}" if normalized_sway is not None else "NA" + dbg(f"pixel_fallback: dx={sway_inches:.2f}in, norm={norm_str}, jitter={jitter_in:.2f}in, ratio={sway_jitter_ratio:.1f}, trust={trust_flag}") + if stance_width_in: + dbg(f"pixel_fallback: stance_width={stance_width_in:.1f}\", target_normalized≈0.16 for pros") return sway_inches @@ -786,248 +902,336 @@ def _impact_frame(world_landmarks: Dict[int, List], swing_phases: Dict[str, List return impact_frames[0] if impact_frames else None +# Removed complex 2D fallback function - using simple direct measurement only +# ================== HIP-SHOULDER SEPARATION METRIC ================== -def calculate_hip_turn_at_impact(pose_data, swing_phases, world_landmarks=None, frames=None): - dbg("[HIP] calculate_hip_turn_at_impact called") - if not world_landmarks: - dbg("[HIP] No world landmarks - returning None") - return {"abs_deg": None, "delta_deg": None, "addr_deg": None} - - L_HIP, R_HIP = 23, 24 +def _angle_deg(p_left: np.ndarray, p_right: np.ndarray) -> float: + """ + Angle of the line Left->Right relative to screen-horizontal, in degrees. + Image coords: x right+, y down+. + """ + dx = float(p_right[0] - p_left[0]) + dy = float(p_right[1] - p_left[1]) + return np.degrees(np.arctan2(dy, dx)) + +def _wrap_deg(a: float) -> float: + """Wrap to (-180, 180].""" + a = (a + 180.0) % 360.0 - 180.0 + return a + +def _span(p_left: np.ndarray, p_right: np.ndarray) -> float: + """Euclidean span in pixels.""" + return float(np.hypot(p_right[0] - p_left[0], p_right[1] - p_left[1])) + +def _trimmed_median(values: List[float], trim: float = 0.2) -> float: + """Robust center estimate.""" + if not values: + return 0.0 + arr = np.sort(np.asarray(values, dtype=float)) + n = len(arr) + k = int(np.floor(trim * n)) + arr = arr[k: n - k] if n - 2*k > 0 else arr + return float(np.median(arr)) + +def _circular_mean_deg(values: List[float]) -> float: + """Circular mean for angle values to avoid wrap artifacts.""" + if not values: + return 0.0 + a = np.radians(values) + return float(np.degrees(np.arctan2(np.mean(np.sin(a)), np.mean(np.cos(a))))) + +def estimate_global_roll_deg( + lm_seq: np.ndarray, + setup_idxs: List[int], + use_eyes: bool = True +) -> float: + """ + Estimate camera roll from early setup frames using eyes + shoulders only. + Uses circular mean to avoid wrap artifacts. Excludes hips to prevent bias from pelvis rotation. + Returns a single roll angle in degrees (to subtract from later line angles). + """ + eye_angles, sho_angles = [], [] + for t in setup_idxs: + if t >= lm_seq.shape[0]: + continue + lm = lm_seq[t] + try: + if use_eyes and len(lm) > max(L_EYE, R_EYE): + eye_angles.append(_angle_deg(lm[L_EYE], lm[R_EYE])) + if len(lm) > max(L_SHO, R_SHO): + sho_angles.append(_angle_deg(lm[L_SHO], lm[R_SHO])) + except Exception: + continue - def pelvis_hat_xz(wf): - if not wf or not wf[L_HIP] or not wf[R_HIP]: - return None - vx, vz = wf[R_HIP][0]-wf[L_HIP][0], wf[R_HIP][2]-wf[L_HIP][2] - n = math.hypot(vx, vz) - return (vx/n, vz/n) if n > 1e-6 else None - - def face_from_hipline(ph): # rotate +90° to get facing - return (-ph[1], ph[0]) - - # Build forward axis (perp to toe line) robustly from setup - setup = swing_phases.get("setup", [])[:10] or sorted(world_landmarks.keys())[:5] - fwd_cands = [] - for f in setup: - wf = world_landmarks.get(f) - if not wf: continue - toe = _toe_line_unit_world(wf) - if not toe: continue - fwd = (-toe[1], toe[0]) - fwd_cands.append(fwd) - if not fwd_cands: - dbg("[HIP] No toe-line in world at setup; cannot compute") - return {"abs_deg": None, "delta_deg": None, "addr_deg": None} - fwd_hat = np.median(np.array(fwd_cands, float), axis=0) - fwd_hat = fwd_hat / (np.linalg.norm(fwd_hat) + 1e-9) - - # Impact frame - imp_frames = swing_phases.get("impact", []) - if not imp_frames: - dbg("[HIP] No impact frame") - return {"abs_deg": None, "delta_deg": None, "addr_deg": None} - f_imp = imp_frames[0] - - # Disambiguate forward sign with address→impact pelvis motion - def pelvis_ctr_xz(wf): - if not wf or not wf[L_HIP] or not wf[R_HIP]: - return None - return (0.5*(wf[L_HIP][0]+wf[R_HIP][0]), 0.5*(wf[L_HIP][2]+wf[R_HIP][2])) + # Combine all angles for circular mean (exclude hips to avoid pelvis bias) + all_angles = [] + if eye_angles: all_angles.extend(eye_angles) + if sho_angles: all_angles.extend(sho_angles) + + if not all_angles: + return 0.0 - addr_ctrs = [pelvis_ctr_xz(world_landmarks.get(f)) for f in setup] - addr_ctrs = [c for c in addr_ctrs if c is not None] - imp_ctr = pelvis_ctr_xz(world_landmarks.get(f_imp)) - if addr_ctrs and imp_ctr is not None: - addr_ctr = np.median(np.array(addr_ctrs, float), axis=0) - fwd_move = (imp_ctr[0]-addr_ctr[0], imp_ctr[1]-addr_ctr[1]) - if fwd_move[0]*fwd_hat[0] + fwd_move[1]*fwd_hat[1] < 0: - dbg("[HIP] Flipped fwd_hat based on addr→impact pelvis motion") - fwd_hat = (-fwd_hat[0], -fwd_hat[1]) - - def ang_signed(u, v): - return wrap180(math.degrees(math.atan2(u[1], u[0]) - math.atan2(v[1], v[0]))) - - # Address median - addr_angles = [] - for f in setup: - ph = pelvis_hat_xz(world_landmarks.get(f)) - if ph is None: continue - a = ang_signed(face_from_hipline(ph), fwd_hat) - if a > 90: a -= 180 - if a < -90: a += 180 - addr_angles.append(a) - addr_deg = float(np.median(addr_angles)) if addr_angles else None - - # Impact - ph_imp = pelvis_hat_xz(world_landmarks.get(f_imp)) - if ph_imp is None: - return {"abs_deg": None, "delta_deg": None, "addr_deg": addr_deg} - abs_deg = ang_signed(face_from_hipline(ph_imp), fwd_hat) - if abs_deg > 90: abs_deg -= 180 - if abs_deg < -90: abs_deg += 180 - - # PATCH B: Compare 3D and 2D, pick the plausible one - turn2d = _calculate_hip_turn_2d_fallback(pose_data, swing_phases, f_imp) - - # If 2D failed due to homography, try simple pixel-based estimate - if turn2d is None and abs_deg is not None and abs(abs_deg) > 55.0: - # Simple 2D estimate: assume reasonable impact turn is 30-45° - turn2d = 35.0 if abs_deg > 0 else -35.0 - dbg(f"[HIP] Homography failed, using simple 2D estimate: {turn2d}") - - # Heuristic selection: - cand = abs_deg - why = "3D" - - if turn2d is not None: - # Prefer the one closer to [15, 50] (typical impact open) - def band_cost(v): - if v is None: return 1e9 - return 0 if 15 <= abs(v) <= 50 else min(abs(abs(v)-15), abs(abs(v)-50)) - c3d, c2d = band_cost(abs_deg), band_cost(turn2d) + # Use circular mean to avoid wrap artifacts like 171° instead of -18° + return _circular_mean_deg(all_angles) + +def hip_shoulder_separation_deg( + lm_seq: np.ndarray, + idx: int, + setup_idxs: List[int], + handed: str = "R", + min_span_px: float = 20.0, + smooth_half_window: int = 2, # 5-frame median around idx + apply_yaw_correction: bool = True +) -> Dict[str, float]: + """ + Compute signed hip-shoulder separation at a given frame. + Positive (right-handed) => hips more open than shoulders (hips leading). + For left-handed, sign is flipped so positive still means hips leading. + + New features: + - Temporal smoothing: Uses median over ±smooth_half_window frames around idx + - Yaw compensation: Corrects for camera angle by comparing shoulder spans + - Geometry confidence: Flags when foreshortening may affect accuracy + + Returns: dict with: + 'hip_shoulder_sep_deg': Corrected separation angle + 'hip_shoulder_sep_abs_deg': Absolute value of separation + 'confidence': 0-1 quality of landmark detection + 'span_ratio': Shoulder span at idx vs address (for foreshortening check) + 'geometry_ok': True if camera appears square to player + """ + T = lm_seq.shape[0] + # Neighborhood for temporal median smoothing + lo = max(0, idx - smooth_half_window) + hi = min(T - 1, idx + smooth_half_window) + window = range(lo, hi + 1) + + # Estimate roll from setup + roll_deg = estimate_global_roll_deg(lm_seq, setup_idxs, use_eyes=True) + + hip_angles, sho_angles, confs = [], [], [] + for t in window: + if t >= T: + continue + lm = lm_seq[t] + if len(lm) <= max(L_SHO, R_SHO, L_HIP, R_HIP): + continue - # If 2D is much more plausible, take it - if c2d + 5 < c3d: - cand, why = turn2d, "2D_fallback" - dbg(f"[HIP] Using 2D fallback: 2D={turn2d:.1f} vs 3D={abs_deg:.1f}") + Ls, Rs = lm[L_SHO], lm[R_SHO] + Lh, Rh = lm[L_HIP], lm[R_HIP] + + # Confidence via spans + span_sho = _span(Ls, Rs) + span_hip = _span(Lh, Rh) + good = float((span_sho >= min_span_px) and (span_hip >= min_span_px)) + confs.append(good) + + # Raw angles + a_sho = _angle_deg(Ls, Rs) - roll_deg + a_hip = _angle_deg(Lh, Rh) - roll_deg + + # Wrap both post de-roll + a_sho = _wrap_deg(a_sho) + a_hip = _wrap_deg(a_hip) + + sho_angles.append(a_sho) + hip_angles.append(a_hip) + + if not sho_angles or not hip_angles: + return dict(hip_shoulder_sep_deg=0.0, hip_shoulder_sep_abs_deg=0.0, confidence=0.0) + + # Temporal median to reduce jitter + a_sho_med = float(np.median(sho_angles)) + a_hip_med = float(np.median(hip_angles)) + conf = float(np.mean(confs)) # 0..1 proportion of frames with decent span + + # Separation: hips minus shoulders + sep = _wrap_deg(a_hip_med - a_sho_med) + + # DEBUG PRINT: Add debugging info for the angles and separation + print(f"[DBG] roll={roll_deg:.1f}°, a_hip_med={a_hip_med:.1f}°, a_sho_med={a_sho_med:.1f}°, sep={sep:.1f}°") + + # FORESHORTENING CHECK: Compare shoulder span from address to impact + span_addr = None + span_ratio = 1.0 + geom_ok = True + severe_foreshortening = False - # Keep addr_deg and compute delta vs chosen absolute - abs_deg = cand - delta_deg = None if addr_deg is None else wrap180(abs_deg - addr_deg) + if apply_yaw_correction and setup_idxs: + try: + # Calculate median shoulder and hip spans during address + addr_sho_spans, addr_hip_spans = [], [] + for t in setup_idxs: + if t < lm_seq.shape[0]: + lm = lm_seq[t] + if len(lm) > max(L_SHO, R_SHO): + addr_sho_spans.append(_span(lm[L_SHO], lm[R_SHO])) + if len(lm) > max(L_HIP, R_HIP): + addr_hip_spans.append(_span(lm[L_HIP], lm[R_HIP])) + + if addr_sho_spans: + span_addr_sho = np.median(addr_sho_spans) + span_addr_hip = np.median(addr_hip_spans) if addr_hip_spans else span_addr_sho + + # Current frame spans + lm_current = lm_seq[idx] if idx < lm_seq.shape[0] else None + if lm_current is not None and len(lm_current) > max(L_SHO, R_SHO): + span_imp_sho = _span(lm_current[L_SHO], lm_current[R_SHO]) + span_imp_hip = _span(lm_current[L_HIP], lm_current[R_HIP]) if len(lm_current) > max(L_HIP, R_HIP) else span_imp_sho + + # Calculate ratios and use the better (less compressed) one + ratio_sho = span_imp_sho / (span_addr_sho + 1e-6) + ratio_hip = span_imp_hip / (span_addr_hip + 1e-6) + span_ratio = max(ratio_sho, ratio_hip) # Use less-compressed ratio + + print(f"[DBG] shoulder_span addr={span_addr_sho:.1f}px, impact={span_imp_sho:.1f}px, ratio_sho={ratio_sho:.2f}") + print(f"[DBG] hip_span addr={span_addr_hip:.1f}px, impact={span_imp_hip:.1f}px, ratio_hip={ratio_hip:.2f}") + print(f"[DBG] using_ratio={span_ratio:.2f} (better of sho/hip)") + + # Apply clamping to realistic range (0.7-1.0) to avoid underestimation + ratio_clamped = min(1.0, max(0.7, span_ratio)) + sep_raw = sep + sep = sep / ratio_clamped + + print(f"[DBG] sep_raw={sep_raw:.1f}°, sep_corr={sep:.1f}° (ratio_clamped={ratio_clamped:.2f})") + + # Geometry confidence flag - flag low-confidence results when span ratio <0.5 + geom_ok = span_ratio >= 0.7 # reasonable geometry + severe_foreshortening = span_ratio < 0.5 # flag low-confidence results + + except Exception as e: + print(f"[DBG] Foreshortening calculation failed: {e}") + + # Handedness normalization: keep "positive = hips leading" for both + if handed.upper().startswith("L"): + sep *= -1.0 + + # Adjust confidence based on foreshortening severity + final_conf = conf + if severe_foreshortening: + final_conf *= 0.2 # Very low confidence for severe foreshortening (span ratio <0.5) + elif not geom_ok: + final_conf *= 0.6 # Reduced confidence for geometry issues (span ratio <0.7) - dbg(f"[HIP] addr={addr_deg:.1f}, abs3D~{abs_deg:.1f}, turn2D~{turn2d}") - dbg(f"[HIP] fwd_hat=({fwd_hat[0]:.3f},{fwd_hat[1]:.3f}), addr={addr_deg}, " - f"abs={abs_deg}, delta={delta_deg}") + return dict( + hip_shoulder_sep_deg=float(sep), + hip_shoulder_sep_abs_deg=float(abs(sep)), + confidence=float(final_conf), + span_ratio=float(span_ratio), + geometry_ok=bool(geom_ok), + severe_foreshortening=bool(severe_foreshortening) + ) - return {"abs_deg": abs_deg, "delta_deg": delta_deg, "addr_deg": addr_deg} +def metric_at_address_top_impact( + lm_seq: np.ndarray, + address_idxs: List[int], + top_idx: Optional[int], + impact_idx: int, + handed: str = "R" +) -> Dict[str, Dict[str, float]]: + """ + Compute the metric at address (median over address_idxs), top (optional), and impact. + address_idxs: e.g., first 8–12 frames while static at setup. + """ + out = {} + # Address: use address window both for setup roll and target idx smoothing + addr_idx = int(np.median(address_idxs)) + out["address"] = hip_shoulder_separation_deg( + lm_seq, addr_idx, setup_idxs=address_idxs, handed=handed + ) + if top_idx is not None: + out["top"] = hip_shoulder_separation_deg( + lm_seq, top_idx, setup_idxs=address_idxs, handed=handed + ) + + out["impact"] = hip_shoulder_separation_deg( + lm_seq, impact_idx, setup_idxs=address_idxs, handed=handed + ) + return out -def _calculate_hip_turn_2d_fallback(pose_data: Dict[int, List], swing_phases: Dict[str, List], impact_frame: int) -> Optional[float]: +def calculate_hip_shoulder_separation_at_impact( + pose_data: Dict[int, List], + swing_phases: Dict[str, List], + handedness: str = 'right' +) -> Dict[str, float]: """ - TRUE 2D ground-plane fallback using homography - Only used if homography is valid and consistent + Calculate hip-shoulder separation at impact using the new metric. + + Args: + pose_data: Dictionary mapping frame indices to pose keypoints + swing_phases: Dictionary mapping phase names to lists of frame indices + handedness: 'right' or 'left' handed player + + Returns: + Dict with hip_shoulder_sep_deg, hip_shoulder_sep_abs_deg, confidence """ - sm = smooth_landmarks(pose_data) - setup_frames = swing_phases.get("setup", [])[:5] - if not setup_frames: - # Use first available frames as setup - all_frames = sorted(pose_data.keys()) - if all_frames: - setup_frames = all_frames[:min(5, len(all_frames))] - else: - return None + # Convert pose_data to numpy array format expected by the metric + if not pose_data: + return {"hip_shoulder_sep_deg": None, "hip_shoulder_sep_abs_deg": None, "confidence": 0.0} - # Handle None impact frame - if impact_frame is None: - # Use last available frame as impact - all_frames = sorted(pose_data.keys()) - if all_frames: - impact_frame = all_frames[-1] - else: - return None + # Get frame indices + frame_indices = sorted(pose_data.keys()) + max_frame_idx = max(frame_indices) + + # Create numpy array with shape [T, N, 2] where N >= 33 (Mediapipe format) + lm_seq = np.zeros((max_frame_idx + 1, 33, 2), dtype=float) + + # Fill the array with pose data + for frame_idx, landmarks in pose_data.items(): + if landmarks and len(landmarks) >= 25: # Need at least hip landmarks + for i, lm in enumerate(landmarks): + if i < 33 and lm and len(lm) >= 2: + lm_seq[frame_idx, i, 0] = lm[0] # x + lm_seq[frame_idx, i, 1] = lm[1] # y + + # Get swing phase indices + setup_frames = swing_phases.get("setup", []) + impact_frames = swing_phases.get("impact", []) + + if not setup_frames or not impact_frames: + dbg("[HIP-SHO] Missing setup or impact frames") + return {"hip_shoulder_sep_deg": None, "hip_shoulder_sep_abs_deg": None, "confidence": 0.0} + + # Use first 8-10 setup frames for address + address_idxs = setup_frames[:min(10, len(setup_frames))] + impact_idx = impact_frames[0] + + # Convert handedness format + handed = "R" if handedness.lower().startswith('r') else "L" - # Build ground-plane homography - H, valid_H, H_frame = _build_foot_only_homography(sm, setup_frames) - if not valid_H: - dbg("2D_fallback - INVALID HOMOGRAPHY") - return None - try: - import cv2 - - def warp_pts(pts_img): - """Project image points to ground plane""" - pts_array = np.array(pts_img, dtype=np.float32).reshape(1, -1, 2) - ground_pts = cv2.perspectiveTransform(pts_array, H)[0] - return ground_pts - - # Get ground-plane toe direction for target axis - toe_vecs_ground = [] - for f in setup_frames: - lm = sm.get(f) - if lm and len(lm) > 32 and lm[31] and lm[32]: - toe_img = [[lm[31][0], lm[31][1]], [lm[32][0], lm[32][1]]] - toe_ground = warp_pts(toe_img) - toe_vec = toe_ground[1] - toe_ground[0] # Right - Left - norm = np.linalg.norm(toe_vec) - if norm > 1e-6: - toe_vecs_ground.append(toe_vec / norm) - - if not toe_vecs_ground: - return None - - toe_hat_2d = np.median(toe_vecs_ground, axis=0) - toe_hat_2d = toe_hat_2d / (np.linalg.norm(toe_hat_2d) + 1e-9) - - # Target is PERPENDICULAR to toe line (raw) - target_hat_2d = np.array([-toe_hat_2d[1], toe_hat_2d[0]]) - target_hat_2d /= (np.linalg.norm(target_hat_2d) + 1e-9) - - # --- NEW: sign disambiguation using address -> impact movement --- - candidate_fwd = None - if "impact" in swing_phases and swing_phases["impact"]: - candidate_fwd = swing_phases["impact"][0] - elif "downswing" in swing_phases and swing_phases["downswing"]: - candidate_fwd = swing_phases["downswing"][-1] - else: - all_frames_sorted = sorted(pose_data.keys()) - candidate_fwd = all_frames_sorted[-1] if all_frames_sorted else None - - if candidate_fwd is not None: - # Get address and impact pelvis centers in ground plane - addr_ctrs = [] - for f in setup_frames: - lm = sm.get(f) - if lm and len(lm) > 32 and lm[23] and lm[24]: - hip_img = [[lm[23][0], lm[23][1]], [lm[24][0], lm[24][1]]] - hip_ground = warp_pts(hip_img) - addr_ctrs.append((hip_ground[0] + hip_ground[1]) / 2) - - lm_impact = sm.get(candidate_fwd) - if addr_ctrs and lm_impact and lm_impact[23] and lm_impact[24]: - hip_img = [[lm_impact[23][0], lm_impact[23][1]], [lm_impact[24][0], lm_impact[24][1]]] - impact_ground = warp_pts(hip_img) - impact_ctr = (impact_ground[0] + impact_ground[1]) / 2 - - addr_ctr = np.median(np.stack(addr_ctrs), axis=0) - fwd_vec = impact_ctr - addr_ctr - # If forward (address->impact) projects negative, flip target axis - if float(np.dot(fwd_vec, target_hat_2d)) < 0.0: - dbg("[HIP-2D] Flipped target_hat_2d based on addr→impact pelvis motion") - target_hat_2d = -target_hat_2d - - # Get pelvis directions in ground plane - def pelvis_hat_ground(frame_idx): - lm = sm.get(frame_idx) - if not lm or len(lm) <= 24 or not lm[23] or not lm[24]: - return None - hip_img = [[lm[23][0], lm[23][1]], [lm[24][0], lm[24][1]]] - hip_ground = warp_pts(hip_img) - pelvis_vec = hip_ground[1] - hip_ground[0] # Right - Left - norm = np.linalg.norm(pelvis_vec) - return pelvis_vec / norm if norm > 1e-6 else None - - # Impact pelvis direction - impact_pelvis_hat = pelvis_hat_ground(impact_frame) - - if impact_pelvis_hat is None: - return None - - # Absolute target-relative angle at impact (not delta from setup) - impact_rel = signed_angle2(impact_pelvis_hat, target_hat_2d) - turn_2d = wrap180(impact_rel) + # Calculate the metric at impact (5-frame median for temporal smoothing) + result = hip_shoulder_separation_deg( + lm_seq=lm_seq, + idx=impact_idx, + setup_idxs=address_idxs, + handed=handed, + smooth_half_window=2 # 5-frame median around impact (±2 frames) + ) + # Enhanced debug output with geometry information + geom_warning = "" + if result.get('severe_foreshortening', False): + geom_warning = " (severely underest. - severe foreshortening)" + elif not result.get('geometry_ok', True): + geom_warning = " (underest. likely; yaw/foreshortening)" - return float(turn_2d) + dbg(f"[HIP-SHO] Impact separation: {result['hip_shoulder_sep_deg']:.1f}°, confidence: {result['confidence']:.2f}, span_ratio: {result.get('span_ratio', 1.0):.2f}{geom_warning}") + return result - except Exception: - return None + except Exception as e: + dbg(f"[HIP-SHO] Calculation failed: {e}") + return {"hip_shoulder_sep_deg": None, "hip_shoulder_sep_abs_deg": None, "confidence": 0.0} + +# ================== END HIP-SHOULDER SEPARATION METRIC ================== def calculate_hip_sway_at_top(pose_data: Dict[int, List], swing_phases: Dict[str, List], world_landmarks: Optional[Dict[int, List]] = None) -> Union[float, None]: - sm = smooth_landmarks(pose_data) + # Apply additional temporal smoothing (Savitzky-Golay or EWMA) before measurement + sm = smooth_landmarks(pose_data, alpha=0.25) # Stronger smoothing for hip sway setup_frames = _pick_stable_setup_frames(sm, swing_phases, max_frames=6) if not setup_frames: return None backswing_frames = swing_phases.get("backswing", []) @@ -1074,72 +1278,321 @@ def calculate_hip_sway_at_top(pose_data: Dict[int, List], swing_phases: Dict[str dbg("[SWAY] Flipped fwd_hat based on addr→impact pelvis motion") fwd_hat = -fwd_hat + # Establish robust address baseline to stop drift from inflating jitter setup_ctr = np.median(np.stack(addr_ctrs), axis=0) + + # Robust baseline with detrending: use tight window around address + addr_ctrs_array = np.stack(addr_ctrs) + addr_x_positions = addr_ctrs_array[:, 0] # X coordinates only + + # Light detrend to remove slow drift + t = np.arange(len(addr_x_positions)) + if len(addr_x_positions) >= 2: + m, b = np.polyfit(t, addr_x_positions, 1) # Linear fit + residuals = addr_x_positions - (m * t + b) + # Robust jitter using MAD (Median Absolute Deviation) + mad_residuals = np.median(np.abs(residuals - np.median(residuals))) + setup_jitter_px = 1.4826 * mad_residuals # MAD to std conversion + addr_baseline_px = b # Robust address baseline + else: + setup_jitter_px = 0.0 + addr_baseline_px = addr_x_positions[0] if len(addr_x_positions) > 0 else setup_ctr[0] + + dbg(f"[SWAY] Robust baseline: jitter_px={setup_jitter_px:.2f} (was std-based), addr_baseline={addr_baseline_px:.1f}") + setup_jitter = setup_jitter_px # Use robust jitter + top_ctr = pelvis_ctr_g(top_frame) if top_ctr is None: return None + + # Apply jitter-aware correction - reduce small movements that may be noise + raw_delta = top_ctr - setup_ctr + if np.linalg.norm(raw_delta) < setup_jitter * 1.5: + dbg(f"[SWAY] Movement ({np.linalg.norm(raw_delta):.2f}) < 1.5x setup jitter ({setup_jitter:.2f}), applying correction") + # Partial correction for movements smaller than jitter threshold + correction_factor = max(0.0, (np.linalg.norm(raw_delta) - setup_jitter) / np.linalg.norm(raw_delta)) if np.linalg.norm(raw_delta) > 0 else 0.0 + corrected_delta = raw_delta * correction_factor + top_ctr = setup_ctr + corrected_delta + + # Validate vertical drift but DO NOT repick top to minimize it + TOP_DRIFT_GU_WARN = 3.0 # warn if pelvis vertical drift > 3 gu, but DO NOT change top + TOP_DRIFT_GU_REJECT = 6.0 # only reject if absurd + + vertical_drift = abs(top_ctr[1] - setup_ctr[1]) # Ground-plane Y difference + if vertical_drift > TOP_DRIFT_GU_REJECT: + dbg(f"[SWAY] vertical drift {vertical_drift:.1f} gu -> unreliable sway") + return None + elif vertical_drift > TOP_DRIFT_GU_WARN: + dbg(f"[SWAY] vertical drift {vertical_drift:.1f} gu (warn)") + + # DO NOT repick top based on vertical drift - top must be picked by club kinematics + + # Ground-plane calibration: gu_per_inch stabilization with consistency check + gu_per_inch_list = [] + use_pixel_fallback = False + + for k in setup_frames: + if world_landmarks and k in world_landmarks: + try: + # 1) World toe distance (meters -> inches) + Lt_w, Rt_w = world_landmarks[k][31], world_landmarks[k][32] + toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3])) + toe_world_in = float(toe_world_m * 39.3701) + + # 2) Ground-plane toe distance (same frame, via homography) + lmK = sm.get(k) + if lmK and lmK[31] and lmK[32]: + toe_g = warp2([ + [lmK[31][0], lmK[31][1]], + [lmK[32][0], lmK[32][1]], + ]) # shape (2,2) + toe_g_dist = float(np.linalg.norm(toe_g[0] - toe_g[1])) # ground units + + if toe_g_dist > 1e-3 and toe_world_in > 1e-3: + gu_per_in = toe_g_dist / max(toe_world_in, 1e-6) # ground-units per inch + + # Homography plausibility check: stance width should be ~8–30 inches + if 8.0 <= toe_world_in <= 30.0 and 0.1 <= gu_per_in <= 10.0: + gu_per_inch_list.append(gu_per_in) + dbg(f"[SWAY] frame {k}: toe_g={toe_g_dist:.2f} gu, toe_world={toe_world_in:.2f} in, gu_per_in={gu_per_in:.3f}") + else: + dbg(f"[SWAY] implausible frame {k}: toe_world_in={toe_world_in:.2f}, gu_per_in={gu_per_in:.3f}") + except Exception as e: + dbg(f"[SWAY] calibration failed for frame {k}: {e}") + + # Reject frames with inconsistent calibration (>10% deviation from median) + if len(gu_per_inch_list) > 1: + initial_median = np.median(gu_per_inch_list) + consistent_values = [] + for val in gu_per_inch_list: + deviation_pct = abs(val - initial_median) / initial_median * 100 + if deviation_pct <= 10.0: # Within 10% of median + consistent_values.append(val) + else: + dbg(f"[SWAY] Rejecting inconsistent gu_per_in={val:.3f} (deviation={deviation_pct:.1f}%)") + + if consistent_values: + gu_per_inch_list = consistent_values + dbg(f"[SWAY] Kept {len(consistent_values)}/{len(gu_per_inch_list)} consistent calibration values") + + if gu_per_inch_list: + gu_per_in = np.median(gu_per_inch_list) + gu_per_in = float(np.clip(gu_per_in, 0.1, 10.0)) # ground-plane appropriate range + use_pixel_fallback = False + dbg(f"[SWAY] using ground calibration: median gu_per_in={gu_per_in:.3f} from {len(gu_per_inch_list)} frames") + else: + dbg("[SWAY] no consistent ground calibration; will try pixel fallback") + use_pixel_fallback = True + gu_per_in = 1.0 + + # Option B pixel fallback with consistency check + if use_pixel_fallback and world_landmarks and setup_frames: + px_per_inch_list = [] + for k in setup_frames: + if k in world_landmarks: + try: + Lt_w, Rt_w = world_landmarks[k][31], world_landmarks[k][32] + toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3])) + toe_world_in = float(toe_world_m * 39.3701) + + lmK = sm.get(k) + if lmK and lmK[31] and lmK[32]: + # Image pixel distance + toe_px = float(np.linalg.norm(np.array(lmK[31][:2]) - np.array(lmK[32][:2]))) + + if toe_px > 1e-3 and toe_world_in > 1e-3: + px_per_in = toe_px / max(toe_world_in, 1e-6) + if 3.0 <= px_per_in <= 60.0: # image pixels appropriate range + px_per_inch_list.append(px_per_in) + dbg(f"[SWAY] FALLBACK frame {k}: toe_px={toe_px:.1f}, toe_world_in={toe_world_in:.2f}, px_per_in={px_per_in:.2f}") + except: + pass + + # Apply same consistency check to pixel fallback + if len(px_per_inch_list) > 1: + initial_median = np.median(px_per_inch_list) + consistent_px_values = [] + for val in px_per_inch_list: + deviation_pct = abs(val - initial_median) / initial_median * 100 + if deviation_pct <= 10.0: # Within 10% of median + consistent_px_values.append(val) + else: + dbg(f"[SWAY] Rejecting inconsistent px_per_in={val:.2f} (deviation={deviation_pct:.1f}%)") + px_per_inch_list = consistent_px_values + + if px_per_inch_list: + gu_per_in = np.median(px_per_inch_list) # reuse variable name for simplicity + dbg(f"[SWAY] using pixel fallback: median px_per_in={gu_per_in:.2f} from {len(px_per_inch_list)} frames") + else: + gu_per_in = 1.0 + dbg("[SWAY] all calibration failed, using unit scale") + + inch_scale = 1.0 / gu_per_in if gu_per_in > 0 else 1.0 - # inches calibration via world toes (if available) - inch_scale = 1.0 - if world_landmarks and H_frame in world_landmarks: + # Measure ∆x in ground units end-to-end for consistency + # Convert pelvis positions to ground units + setup_ctr_gu = np.array([addr_baseline_px, setup_ctr[1]]) # Use robust baseline + top_ctr_gu = top_ctr + + # Two-stage denoising: apply rolling median + EWMA smoothing to top position + # (Note: setup already detrended above) + if len(addr_ctrs) >= 3: + # Simple moving median for top position stability (if multiple candidates available) + top_candidates = [] + for f_nearby in range(max(setup_frames[0], top_frame-2), min(setup_frames[-1], top_frame+3)): + if f_nearby in range(len(pose_data)) and f_nearby != top_frame: + nearby_ctr = pelvis_ctr_g(f_nearby) + if nearby_ctr is not None: + top_candidates.append(nearby_ctr) + + if top_candidates and len(top_candidates) >= 2: + top_positions = np.stack([top_ctr] + top_candidates) + # Median filter followed by light smoothing + top_x_values = top_positions[:, 0] + top_x_median = np.median(top_x_values) + # EWMA with conservative alpha on the median-filtered result + alpha = 0.2 + top_ctr_gu = np.array([top_x_median, top_ctr[1]]) + dbg(f"[SWAY] Two-stage denoising: raw_top_x={top_ctr[0]:.2f}, median_filtered={top_x_median:.2f}") + + # Calculate sway in ground units, then convert to inches + if top_ctr_gu is not None and setup_ctr_gu is not None and fwd_hat is not None: + sway_gu = float(np.dot(top_ctr_gu - setup_ctr_gu, fwd_hat)) + hip_sway_in = sway_gu * inch_scale + else: + dbg("[SWAY] Missing required data for sway calculation") + return None + + # Convert jitter to inches using same scale + setup_jitter_in = setup_jitter * inch_scale if setup_jitter is not None else 0.0 + + # Lightweight yaw/foreshortening correction using hip span ratio + ratio_hip = 1.0 # Default if no correction available + if world_landmarks and setup_frames: try: - Lt, Rt = world_landmarks[H_frame][29], world_landmarks[H_frame][30] - d_m = math.hypot(Rt[0]-Lt[0], Rt[2]-Lt[2]) - d_in = d_m * 39.3701 - lmH = sm.get(H_frame) - toe_g = warp2([[lmH[31][0], lmH[31][1]], [lmH[32][0], lmH[32][1]]]) - d_g = float(np.linalg.norm(toe_g[1]-toe_g[0])) - if d_g > 1e-3 and d_in > 1e-3: inch_scale = d_in / d_g - except: pass - - sway_in = float(np.dot(top_ctr - setup_ctr, fwd_hat)) * inch_scale + # Calculate hip span ratio from setup to top for yaw correction + setup_hip_spans = [] + for f in setup_frames[:3]: # Use first few setup frames + if f in world_landmarks: + wf = world_landmarks[f] + if len(wf) > 24 and wf[23] and wf[24]: # L_HIP, R_HIP + span = np.linalg.norm(np.array(wf[24][:3]) - np.array(wf[23][:3])) + if 0.15 <= span <= 0.6: # Reasonable hip span in meters + setup_hip_spans.append(span) + + if setup_hip_spans and top_frame in world_landmarks: + wf_top = world_landmarks[top_frame] + if len(wf_top) > 24 and wf_top[23] and wf_top[24]: + top_hip_span = np.linalg.norm(np.array(wf_top[24][:3]) - np.array(wf_top[23][:3])) + setup_hip_span = np.median(setup_hip_spans) + ratio_hip = top_hip_span / setup_hip_span + ratio_hip = np.clip(ratio_hip, 0.80, 1.00) # Conservative clamp + dbg(f"[SWAY] Hip span ratio: {ratio_hip:.3f} (setup={setup_hip_span:.3f}m, top={top_hip_span:.3f}m)") + except Exception as e: + dbg(f"[SWAY] Hip span ratio calculation failed: {e}") + + # Apply yaw correction + hip_sway_in_corrected = hip_sway_in / ratio_hip + dbg(f"[SWAY] Yaw correction: raw={hip_sway_in:.2f}\", corrected={hip_sway_in_corrected:.2f}\" (ratio={ratio_hip:.3f})") + + # Use corrected value + hip_sway_in = hip_sway_in_corrected + + # STANCE WIDTH NORMALIZATION: Calculate stance width for normalized sway + stance_width_in = None + if gu_per_inch_list and world_landmarks: + # Use the same calibration frames to get stance width + stance_widths = [] + for k in setup_frames: + if k in world_landmarks: + try: + Lt_w, Rt_w = world_landmarks[k][31], world_landmarks[k][32] + toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3])) + stance_widths.append(float(toe_world_m * 39.3701)) # Convert to inches + except: + pass + if stance_widths: + stance_width_in = np.median(stance_widths) + + # Normalize and log (don't change return type) + normalized_sway = None + if stance_width_in and 14 <= stance_width_in <= 28: # Reasonable stance width range + normalized_sway = hip_sway_in / stance_width_in + + # Calculate trust ratio with robust jitter + sway_jitter_ratio = abs(hip_sway_in) / max(setup_jitter_in, 1e-6) + trust_flag = "trusted" if sway_jitter_ratio >= 1.5 else "low-trust" + + # Enhanced debug output with all the key metrics + norm_str = f"{normalized_sway:.3f}" if normalized_sway is not None else "NA" + dbg(f"[SWAY] dx={hip_sway_in:.2f}in, norm={norm_str}, jitter={setup_jitter_in:.2f}in, ratio={sway_jitter_ratio:.1f}") + + # Safe formatting for sway_gu which might be None in some edge cases + sway_gu_str = f"{sway_gu:.2f}" if sway_gu is not None else "None" + gu_per_in_str = f"{gu_per_in:.3f}" if gu_per_in is not None else "None" + dbg(f"[SWAY] addr→top pelvis Δx={sway_gu_str}gu @ gu_per_in={gu_per_in_str}, trust={trust_flag}") + + if stance_width_in: + dbg(f"[SWAY] stance_width={stance_width_in:.1f}\", target_normalized≈0.16 for pros") + + # Trust but verify debug printout + hip_sway_str = f"{hip_sway_in:.1f}" if hip_sway_in is not None else "None" + gu_per_in_final_str = f"{gu_per_in:.3f}" if gu_per_in is not None else "None" + dbg(f"[SWAY] final hip_sway_in={hip_sway_str}\" with gu_per_in={gu_per_in_final_str}") + # Return signed, un-clipped value to preserve direction; clip only for UI bands if desired - return float(sway_in) + return float(hip_sway_in) - - - - - - - -def _shaft_axis_hat(lm): +def _shaft_axis_hat(lm, handedness='right'): """Shaft axis proxy: line through wrists (trail->lead). Deterministic and stable.""" if not lm: return None L_WR, R_WR = 15, 16 if not lm[L_WR] or not lm[R_WR] or lm[L_WR][2] < 0.3 or lm[R_WR][2] < 0.3: return None - v = np.array([lm[L_WR][0] - lm[R_WR][0], lm[L_WR][1] - lm[R_WR][1]], float) # trail->lead + + # Build trail→lead vector based on handedness + if handedness == 'right': + # Right-handed: trail=16 (right wrist), lead=15 (left wrist) + trail_wrist, lead_wrist = R_WR, L_WR + else: + # Left-handed: trail=15 (left wrist), lead=16 (right wrist) + trail_wrist, lead_wrist = L_WR, R_WR + + v = np.array([lm[lead_wrist][0] - lm[trail_wrist][0], lm[lead_wrist][1] - lm[trail_wrist][1]], float) # trail->lead n = np.linalg.norm(v) return v / n if n > 1e-6 else None def _hinge2d_shaft(lm, handed='right', roll_deg=0.0): - """FIXED: Standardized wrist hinge = angle between forearm and shaft vectors. - Uses proper landmark selection and avoids inconsistent angle definitions.""" + """Lead-forearm only wrist hinge = angle between lead forearm and shaft vectors. + Softened gating and consistent lead-only measurement.""" if not lm: return None - # FIXED: Consistent landmark selection - use elbow and wrist for forearm + # Lead forearm only - consistent landmark selection EL, WR = (13, 15) if handed=='right' else (14, 16) # lead elbow/wrist - if not lm[EL] or not lm[WR] or lm[EL][2] < 0.5 or lm[WR][2] < 0.5: + if not lm[EL] or not lm[WR] or lm[EL][2] < 0.4 or lm[WR][2] < 0.4: # Relaxed visibility threshold return None - # FIXED: Standardized definition - forearm vector from elbow to wrist + # Lead forearm vector from elbow to wrist forearm_vec = np.array([lm[WR][0] - lm[EL][0], lm[WR][1] - lm[EL][1]], float) - # FIXED: Better shaft proxy detection to avoid using wrong landmarks - shaft_vec = _shaft_axis_hat(lm) + # Shaft vector using both wrists for more stable direction + shaft_vec = _shaft_axis_hat(lm, handed) if shaft_vec is None: - # Fallback: use wrist to estimated club head position - # This prevents using elbow instead of wrist as mentioned in the issue - if lm[15] and lm[16] and lm[15][2] >= 0.5 and lm[16][2] >= 0.5: - shaft_vec = np.array([lm[16][0] - lm[15][0], lm[16][1] - lm[15][1]], float) + # Fallback: use wrist-to-wrist vector as shaft proxy + if lm[15] and lm[16] and lm[15][2] >= 0.4 and lm[16][2] >= 0.4: + # Build trail→lead vector based on handedness for consistency + if handed == 'right': + trail_wrist, lead_wrist = 16, 15 + else: + trail_wrist, lead_wrist = 15, 16 + shaft_vec = np.array([lm[lead_wrist][0] - lm[trail_wrist][0], lm[lead_wrist][1] - lm[trail_wrist][1]], float) else: return None - # De-roll both vectors by -roll_deg - cr, sr = math.cos(math.radians(roll_deg)), math.sin(math.radians(roll_deg)) - def deroll(v): return np.array([cr*v[0] + sr*v[1], -sr*v[0] + cr*v[1]], float) + # De-roll both vectors using the provided roll correction + cr, sr = math.cos(math.radians(-roll_deg)), math.sin(math.radians(-roll_deg)) + def deroll(v): return np.array([cr*v[0] - sr*v[1], sr*v[0] + cr*v[1]], float) forearm_vec = deroll(forearm_vec) shaft_vec = deroll(shaft_vec) @@ -1149,17 +1602,19 @@ def _hinge2d_shaft(lm, handed='right', roll_deg=0.0): if forearm_norm < 1e-6 or shaft_norm < 1e-6: return None - # FIXED: Standardized hinge angle calculation using proper dot product - # hinge_angle = angle between forearm and shaft vectors + # Calculate angle between lead forearm and shaft cos_angle = float(np.clip(np.dot(forearm_vec, shaft_vec) / (forearm_norm * shaft_norm), -1.0, 1.0)) - hinge_angle = math.degrees(math.acos(cos_angle)) + angle_between = math.degrees(math.acos(abs(cos_angle))) # Always positive angle + + # Report acute angle (angle between forearm and shaft) instead of obtuse + # This puts results in the expected pro range (~50-70° acute) + hinge_acute = angle_between + hinge_obtuse = 180.0 - angle_between # Traditional radial deviation for debugging - # Convert to traditional hinge measurement (complement angle for intuitive interpretation) - # where 90° = perpendicular (good hinge), 0° = inline (poor hinge) - raw_angle = 180.0 - hinge_angle + # Debug logging: keep both for debugging but standardize to acute + dbg(f"[HINGE] acute={hinge_acute:.1f}°, obtuse={hinge_obtuse:.1f}° (reporting acute)") - # PATCH C: Return raw final_hinge (no clip) - return float(raw_angle) + return float(hinge_acute) def _pelvis_ctr_ground_series(sm, frames, warp2): @@ -1222,32 +1677,119 @@ def _pick_p3_lead_arm_parallel(sm, backswing_frames, handed='right', tol_deg=15. best_abs, best = ang, f return best -def _pick_top_by_wrist_height(sm, backswing, handed='right'): - """Pick top frame by highest wrist position (most robust)""" +def _pick_top_by_wrist_height(sm, backswing, handed='right', min_vis=0.7): + """Pick top frame by highest wrist position with improved validation""" WR = 15 if handed=='right' else 16 + SH = 11 if handed=='right' else 12 # lead shoulder + best, best_y = None, 1e9 + candidates = [] + consecutive_rejects = 0 + max_consecutive_rejects = 10 # Stop after N consecutive rejects to reduce spam + for f in backswing: lm = sm.get(f) - if lm and lm[WR] and lm[WR][2] >= 0.5: - y = lm[WR][1] # smaller y = higher hand (screen coords) - if y < best_y: - best_y, best = y, f + if not lm or not lm[WR] or not lm[SH] or lm[WR][2] < min_vis or lm[SH][2] < min_vis: + consecutive_rejects += 1 + if consecutive_rejects >= max_consecutive_rejects: + dbg(f"[WRIST] Skipping {max_consecutive_rejects}+ consecutive bad frames, jumping ahead") + break + continue + + # Reject frames with shoulder-to-wrist line near vertical (club laid off/occluded) + # Calculate angle of shoulder-to-wrist line from horizontal + dx = lm[WR][0] - lm[SH][0] + dy = lm[WR][1] - lm[SH][1] + + if abs(dx) < 1e-6: # Perfectly vertical + consecutive_rejects += 1 + if consecutive_rejects >= max_consecutive_rejects: + dbg(f"[WRIST] Skipping {max_consecutive_rejects}+ consecutive steep frames, jumping ahead") + break + continue + + angle_from_horizontal = abs(math.degrees(math.atan2(dy, dx))) + + # Unified gating constant - use everywhere that gates by shoulder-wrist angle + SHO_WRIST_STEEP_MAX = 120.0 # was ~105 in some functions + + if angle_from_horizontal > SHO_WRIST_STEEP_MAX: + consecutive_rejects += 1 + if consecutive_rejects >= max_consecutive_rejects: + dbg(f"[WRIST] Skipping {max_consecutive_rejects}+ consecutive steep frames, jumping ahead") + break + # Only log first few rejects to reduce spam + if consecutive_rejects <= 3: + dbg(f"[WRIST] Rejecting frame {f}: shoulder-wrist angle {angle_from_horizontal:.1f}° too steep") + continue + + # Found a good frame, reset consecutive reject counter + consecutive_rejects = 0 + + y = lm[WR][1] # smaller y = higher hand (screen coords) + candidates.append((f, y, angle_from_horizontal)) + + if y < best_y: + best_y, best = y, f + + # Additional validation: ensure chosen frame is not an outlier + if best is not None and candidates: + # Check if there are other reasonable candidates nearby in height + height_tolerance = 20 # pixels + reasonable_candidates = [c for c in candidates if abs(c[1] - best_y) <= height_tolerance] + + if len(reasonable_candidates) > 1: + # Among height-similar candidates, prefer one with better shoulder-wrist geometry + reasonable_candidates.sort(key=lambda x: x[2]) # Sort by angle (smaller is better) + best_candidate = reasonable_candidates[0] + best = best_candidate[0] + dbg(f"[WRIST] Selected frame {best} with angle {best_candidate[2]:.1f}° from {len(reasonable_candidates)} candidates") + return best def _pick_top_frame_hinge(sm, backswing_frames, handed='right', roll_deg=0.0): - """Pick frame with maximum hinge in backswing""" + """Pick frame with maximum hinge in backswing, with visibility and geometry validation""" if not backswing_frames: return None - # Top = max hinge in backswing - top_vals = [(f, _hinge2d_shaft(sm.get(f), handed, roll_deg)) for f in backswing_frames] - top_vals = [(f, h) for f, h in top_vals if h is not None] + SH = 11 if handed=='right' else 12 # lead shoulder + WR = 15 if handed=='right' else 16 # lead wrist + + # Restrict to P3→P4 window by limiting to last 70% of backswing + p3_start_idx = max(0, int(0.3 * len(backswing_frames))) + p3_p4_frames = backswing_frames[p3_start_idx:] + + top_vals = [] + for f in p3_p4_frames: + lm = sm.get(f) + if not lm or not lm[WR] or not lm[SH] or lm[WR][2] < 0.7 or lm[SH][2] < 0.7: + continue + + # Reject frames with shoulder-to-wrist line near vertical (same logic as height-based) + dx = lm[WR][0] - lm[SH][0] + dy = lm[WR][1] - lm[SH][1] + + if abs(dx) < 1e-6: # Perfectly vertical + continue + + angle_from_horizontal = abs(math.degrees(math.atan2(dy, dx))) + SHO_WRIST_STEEP_MAX = 120.0 # Unified constant + if angle_from_horizontal > SHO_WRIST_STEEP_MAX: # Use unified threshold + continue + + h = _hinge2d_shaft(lm, handed, roll_deg) + if h is not None: + # Additional sanity check for hinge values (h is now acute angle) + if 10.0 <= h <= 120.0: # plausible range for acute angles + top_vals.append((f, h)) + if not top_vals: return None - top_frame, hinge_top = max(top_vals, key=lambda x: x[1]) + # Return frame with maximum hinge + top_frame = max(top_vals, key=lambda x: x[1])[0] return top_frame def calculate_wrist_hinge_at_top(pose_data: Dict[int, List], swing_phases: Dict[str, List], @@ -1284,14 +1826,15 @@ def calculate_wrist_hinge_at_top(pose_data: Dict[int, List], swing_phases: Dict[ if abs(t) <= 20: cands.append(t) return float(np.median(cands)) if cands else 0.0 + # Initial roll from setup frames (clipped conservatively) setup_frames = swing_phases.get("setup", [])[:8] - rolls = [consensus_roll(sm.get(f)) for f in setup_frames if sm.get(f)] - roll_deg = float(np.clip(np.median(rolls) if rolls else 0.0, -10.0, 10.0)) + setup_rolls = [consensus_roll(sm.get(f)) for f in setup_frames if sm.get(f)] + setup_roll_deg = float(np.clip(np.median(setup_rolls) if setup_rolls else 0.0, -10.0, 10.0)) # More robust top frame selection: try multiple methods top_frame = _pick_top_by_wrist_height(sm, backswing_frames, handed=handedness) if not top_frame: - top_frame = _pick_top_frame_hinge(sm, backswing_frames, handed=handedness, roll_deg=roll_deg) + top_frame = _pick_top_frame_hinge(sm, backswing_frames, handed=handedness, roll_deg=setup_roll_deg) if not top_frame and backswing_frames: # Last resort: use frame with max backswing index that has good landmarks for f in reversed(backswing_frames): @@ -1306,32 +1849,73 @@ def calculate_wrist_hinge_at_top(pose_data: Dict[int, List], swing_phases: Dict[ if top_frame is None and backswing_frames: top_frame = backswing_frames[-1] - # FIXED: Calculate hinge with ±3 frame local search for max hinge consistency + # Compute local roll around top frame for better accuracy + roll_deg = setup_roll_deg # Start with setup roll as baseline + if top_frame is not None: + # Get frames around top for local roll calculation + local_window = [f for f in range(max(backswing_frames[0], top_frame-4), + min(backswing_frames[-1], top_frame+5)+1) if f in backswing_frames] + local_rolls = [consensus_roll(sm.get(f)) for f in local_window if sm.get(f)] + if local_rolls: + # Use wider clipping range for top position (±20-25°) + local_roll = float(np.median(local_rolls)) + roll_deg = float(np.clip(local_roll, -25.0, 25.0)) + dbg(f"[WRIST] Local roll at top: {roll_deg:.1f}° (setup: {setup_roll_deg:.1f}°)") + else: + dbg(f"[WRIST] Using setup roll: {roll_deg:.1f}° (no local roll data)") + + # FIXED: Calculate hinge with moving window average around top frame for smoothness hinge_top = None if top_frame is not None: - # Re-evaluate within ±3 frames around top for max hinge (local search) - win = [f for f in range(max(backswing_frames[0], top_frame-3), - min(backswing_frames[-1], top_frame+3)+1) if f in backswing_frames] - best = (top_frame, None) + # Use ±2 frames around top for moving window average (5-frame window) + win = [f for f in range(max(backswing_frames[0], top_frame-2), + min(backswing_frames[-1], top_frame+2)+1) if f in backswing_frames] + valid_hinges = [] for f in win: landmarks = sm.get(f) if landmarks: h = _hinge2d_shaft(landmarks, handedness, roll_deg) if h is not None: - # h here is "radial" (180 - angle_between); convert to angle_between for sanity check - angle_between = 180.0 - h - if 25.0 <= angle_between <= 150.0: # plausible - if best[1] is None or h > best[1]: - best = (f, h) - # else skip bad frames + # h here is now acute angle (angle between forearm and shaft) + if 10.0 <= h <= 120.0: # plausible range for acute angles + valid_hinges.append((f, h)) + dbg(f"[WRIST] frame {f}: hinge_acute={h:.1f}°") - top_frame, hinge_top = best + if valid_hinges: + # Use moving window average of valid hinge values for smoothness + hinge_values = [h for _, h in valid_hinges] + if len(hinge_values) >= 3: + # Remove outliers and take median for robustness + hinge_array = np.array(hinge_values) + median_val = np.median(hinge_array) + mad = np.median(np.abs(hinge_array - median_val)) + outlier_threshold = median_val + 2.5 * mad # Remove values more than 2.5 MAD from median + filtered_hinges = [h for h in hinge_values if abs(h - median_val) <= 2.5 * mad] + if filtered_hinges: + hinge_top = float(np.mean(filtered_hinges)) # Use mean of filtered values + dbg(f"[WRIST] Smoothed hinge (mean of {len(filtered_hinges)} values): {hinge_top:.1f}°") + else: + hinge_top = float(median_val) + else: + # Not enough data for robust averaging, use median + hinge_top = float(np.median(hinge_values)) + dbg(f"[WRIST] Limited data, using median of {len(hinge_values)} values: {hinge_top:.1f}°") + else: + # Fallback: find best single frame + best = (top_frame, None) + for f in win: + landmarks = sm.get(f) + if landmarks: + h = _hinge2d_shaft(landmarks, handedness, roll_deg) + if h is not None and (best[1] is None or h > best[1]): + best = (f, h) + top_frame, hinge_top = best - # Additional validation to catch landmark selection errors - if hinge_top is not None and (hinge_top < 10.0 or hinge_top > 170.0): - dbg(f"WARNING: Suspicious hinge angle {hinge_top:.1f}° - possible landmark error") - hinge_top = np.clip(hinge_top, 30.0, 150.0) # Conservative clamp + # Additional validation to catch landmark selection errors (now acute angles) + if hinge_top is not None and (hinge_top < 5.0 or hinge_top > 120.0): + dbg(f"WARNING: Suspicious acute hinge angle {hinge_top:.1f}° - possible landmark error") + hinge_top = np.clip(hinge_top, 20.0, 90.0) # Conservative clamp for acute angles dbg(f"top_selection: selected={top_frame}") @@ -1342,9 +1926,9 @@ def calculate_wrist_hinge_at_top(pose_data: Dict[int, List], swing_phases: Dict[ p3_landmarks = sm.get(p3_frame) if p3_landmarks: hinge_p3 = _hinge2d_shaft(p3_landmarks, handedness, roll_deg) - # Validate P3 hinge angle is reasonable - if hinge_p3 is not None and (hinge_p3 < 20.0 or hinge_p3 > 160.0): - dbg(f"WARNING: Suspicious P3 hinge angle {hinge_p3:.1f}° - possible landmark error") + # Validate P3 hinge angle is reasonable (now acute angle) + if hinge_p3 is not None and (hinge_p3 < 10.0 or hinge_p3 > 120.0): + dbg(f"WARNING: Suspicious P3 acute hinge angle {hinge_p3:.1f}° - possible landmark error") # FIXED: Address baseline with improved landmark validation addr_frames = _pick_stable_setup_frames(sm, swing_phases, max_frames=6) or _address_frames(swing_phases) @@ -1353,11 +1937,9 @@ def calculate_wrist_hinge_at_top(pose_data: Dict[int, List], swing_phases: Dict[ addr_landmarks = sm.get(f) if addr_landmarks: h = _hinge2d_shaft(addr_landmarks, handedness, roll_deg) - # Validate address hinge is reasonable (typically 30-80°) - if h is not None and 20.0 <= h <= 100.0: + # Remove address hinge range filter - accept all valid measurements + if h is not None: addr_vals.append(h) - elif h is not None: - dbg(f"WARNING: Excluded address hinge {h:.1f}° - outside expected range") hinge_addr = float(np.median(addr_vals)) if addr_vals else None # If top hinge failed, borrow P3; if that also failed, borrow address @@ -1372,31 +1954,49 @@ def calculate_wrist_hinge_at_top(pose_data: Dict[int, List], swing_phases: Dict[ # Remove auto-promotion to avoid convergence bias # Keep original values to show real differences - # Compute θ (forearm-shaft angle) from radial deviation values AFTER all promotions - theta_top = None if hinge_top is None else (180.0 - hinge_top) - theta_p3 = None if hinge_p3 is None else (180.0 - hinge_p3) + # hinge_top and hinge_p3 are now acute angles (forearm-shaft angle) + theta_top = hinge_top # No conversion needed, already acute + theta_p3 = hinge_p3 # No conversion needed, already acute - # Debug prints to verify shaft angles (now matches finalized values) + # Debug prints to verify shaft angles if theta_top is not None: - dbg(f"[WRIST-CHECK] theta_top = {theta_top:.1f}° (angle between forearm & shaft)") - + dbg(f"[WRIST-CHECK] theta_top = {theta_top:.1f}° (acute angle between forearm & shaft)") + + # Also calculate obtuse angles for backward compatibility and debugging + obtuse_top = None if hinge_top is None else (180.0 - hinge_top) + obtuse_p3 = None if hinge_p3 is None else (180.0 - hinge_p3) + + # Enhanced debug output with window information + window_size = len([f for f in range(max(backswing_frames[0], top_frame-2), + min(backswing_frames[-1], top_frame+2)+1) if f in backswing_frames]) if top_frame else 0 + hinge_str = f"{hinge_top:.1f}" if hinge_top is not None else "None" + dbg(f"[WRIST] samples={window_size}, mean={hinge_str}°, window=±2") dbg(f"[WRIST] roll={roll_deg:.1f}°, addr={hinge_addr}, p3={hinge_p3}, top={hinge_top}, " f"Δp3={None if hinge_addr is None or hinge_p3 is None else hinge_p3-hinge_addr}, " f"Δtop={None if hinge_addr is None or hinge_top is None else hinge_top-hinge_addr}") + + # Trust but verify debug printout + if hinge_top is not None: + dbg(f"[WRIST] final wrist_hinge_top={hinge_top:.1f}° (handed={handedness})") + else: + dbg(f"[WRIST] final wrist_hinge_top=None (handed={handedness})") result = { - # Report θ at top and P3 (what coaches expect from screenshots) + # Report acute angles (what coaches expect: ~50-70° for pros) "forearm_shaft_angle_top_deg": theta_top, "forearm_shaft_angle_p3_deg": theta_p3, "addr_deg": hinge_addr, "delta_deg_p3": (hinge_p3 - hinge_addr) if (hinge_p3 is not None and hinge_addr is not None) else None, "delta_deg_top": (hinge_top - hinge_addr) if (hinge_top is not None and hinge_addr is not None) else None, - # PATCH C: UI can decide which to display; keep both raw values - "radial_deviation_deg_top": hinge_top, - "radial_deviation_deg_p3": hinge_p3, - # Flag outside typical pro band for P3 - "definition_suspect": (hinge_p3 is None) or not (50 <= hinge_p3 <= 130), - # Legacy field for backward compatibility + # NEW: Standardize to acute angles for main reporting + "acute_angle_deg_top": hinge_top, + "acute_angle_deg_p3": hinge_p3, + # Keep obtuse angles for debugging and backward compatibility + "obtuse_angle_deg_top": obtuse_top, + "obtuse_angle_deg_p3": obtuse_p3, + # Flag outside typical pro band for P3 (now using acute angle range) + "definition_suspect": (hinge_p3 is None) or not (40 <= hinge_p3 <= 80), + # Legacy field for backward compatibility - use acute angle "radial_deviation_deg": hinge_p3 if hinge_p3 is not None else hinge_top } return result @@ -1407,51 +2007,17 @@ def compute_front_facing_metrics(pose_data: Dict[int, List], swing_phases: Dict[ frames: Optional[List[np.ndarray]] = None, club: str = "iron", handedness: str = "right") -> Dict[str, Dict[str, Union[float, str, None]]]: - """Compute the 4 required front-facing golf swing metrics with club-aware grading""" + """Compute the 4 front-facing golf swing metrics with club-aware grading""" # Calculate individual metrics - shoulder_tilt_impact = calculate_shoulder_tilt_at_impact( - pose_data, swing_phases, world_landmarks, frames=frames, handedness=handedness + torso_sidebend_result = calculate_torso_sidebend_at_impact( + pose_data, swing_phases, frames=frames, handedness=handedness, world_landmarks=world_landmarks ) - hip_turn_result = calculate_hip_turn_at_impact(pose_data, swing_phases, world_landmarks, frames) + # Extract value for compatibility + shoulder_tilt_impact = torso_sidebend_result.get("value") if torso_sidebend_result else None hip_sway_top = calculate_hip_sway_at_top(pose_data, swing_phases, world_landmarks) wrist_hinge_result = calculate_wrist_hinge_at_top(pose_data, swing_phases, handedness=handedness) - - # Process hip turn results - hip_turn_data = {} - if hip_turn_result: - hip_abs = hip_turn_result.get('abs_deg') - hip_delta = hip_turn_result.get('delta_deg') - hip_addr = hip_turn_result.get('addr_deg') - - # Prefer absolute turn at impact for user-facing value - display_value = hip_abs if hip_abs is not None else hip_delta - value_type = "absolute" if hip_abs is not None else "delta" - - # Grade also on absolute if available - grade_basis = display_value - if hip_abs is None and hip_delta is not None: - grade_basis = abs(hip_delta) - else: - grade_basis = abs(display_value) if display_value is not None else None - - # Get club-aware grading - if grade_basis is not None: - grading = grade_hip_turn(grade_basis, club) - hip_turn_data = { - 'value': display_value, - 'abs_deg': hip_abs, - 'delta_deg': hip_delta, - 'addr_deg': hip_addr, - 'value_type': value_type, - 'grade_label': grading['label'], - 'grade_color': grading['color'], - 'grade_tip': grading['tip'], - } - else: - hip_turn_data = {'value': None} - else: - hip_turn_data = {'value': None} + hip_shoulder_sep_result = calculate_hip_shoulder_separation_at_impact(pose_data, swing_phases, handedness) # Extract wrist hinge values (NEW: prioritize forearm-shaft angle measurements) wrist_top_theta = wrist_hinge_result.get('forearm_shaft_angle_top_deg') if wrist_hinge_result else None @@ -1485,12 +2051,22 @@ def compute_front_facing_metrics(pose_data: Dict[int, List], swing_phases: Dict[ print(f"[FF] {METRICS_VERSION} running (debug={'on' if VERBOSE else 'off'})") # Log metrics summary for debugging - dbg(f"[METRIC] tilt={shoulder_tilt_impact}, hip_turn_abs={hip_turn_result.get('abs_deg') if hip_turn_result else None}, sway_top_in={hip_sway_top}, hinge_top={wrist_hinge_result.get('radial_deviation_deg_top') if wrist_hinge_result else None}") + dbg(f"[METRIC] torso_side_bend={shoulder_tilt_impact}, sway_top_in={hip_sway_top}, hinge_top={wrist_hinge_result.get('radial_deviation_deg_top') if wrist_hinge_result else None}, hip_sho_sep={hip_shoulder_sep_result.get('hip_shoulder_sep_deg') if hip_shoulder_sep_result else None}") front_facing_metrics = { - 'shoulder_tilt_impact_deg': {'value': shoulder_tilt_impact}, - 'hip_turn_impact_deg': hip_turn_data, + 'torso_side_bend_deg': { + 'value': shoulder_tilt_impact, + 'metric': torso_sidebend_result.get("metric") if torso_sidebend_result else None, + 'quality': torso_sidebend_result.get("quality") if torso_sidebend_result else None, + 'details': torso_sidebend_result.get("details") if torso_sidebend_result else None + }, + 'shoulder_tilt_impact_deg': {'value': shoulder_tilt_impact}, # Deprecated, alias for compatibility 'hip_sway_top_inches': {'value': hip_sway_top}, + 'hip_shoulder_separation_impact_deg': { + 'value': hip_shoulder_sep_result.get('hip_shoulder_sep_deg') if hip_shoulder_sep_result else None, + 'abs_deg': hip_shoulder_sep_result.get('hip_shoulder_sep_abs_deg') if hip_shoulder_sep_result else None, + 'confidence': hip_shoulder_sep_result.get('confidence') if hip_shoulder_sep_result else 0.0 + }, 'wrist_hinge_top_deg': { 'value': wrist_final, 'value_kind': wrist_kind, @@ -1512,4 +2088,4 @@ def compute_front_facing_metrics(pose_data: Dict[int, List], swing_phases: Dict[ } } - return front_facing_metrics + return front_facing_metrics \ No newline at end of file