Par-ity_Project / app /models /front_facing_metrics.py
chenemii's picture
Fix front-facing golf metrics: improve accuracy and reliability
33b443b
"""
Front-Facing Golf Swing Metrics Calculator
This module contains all the mathematical calculations for front-facing golf swing metrics.
The metrics_calculator.py file handles DTL (Down-the-Line) view metrics.
"""
import math
import numpy as np
import cv2
from typing import Dict, List, Tuple, Optional, Union
# Landmark indices
L_HIP, R_HIP = 23, 24
L_SHO, R_SHO = 11, 12
L_EYE, R_EYE = 2, 5
# Debug helper - set to True to see detailed metric calculations
VERBOSE = False
def set_debug(on: bool = True):
global VERBOSE
VERBOSE = bool(on)
def dbg(msg):
if VERBOSE: print(f"DEBUG: {msg}")
# Version tag for tracking code changes
METRICS_VERSION = "front-ff v2025-09-26-added-hip-shoulder-separation-4-metrics"
def _best_foot_pair(lm, want_world=False):
"""
Return a robust (left,right) foot pair tuple for TOES/FOOT-INDEX with highest availability.
In MediaPipe Pose:
29 = L_HEEL, 30 = R_HEEL, 31 = L_FOOT_INDEX (toe tip), 32 = R_FOOT_INDEX (toe tip)
We prefer (31,32). If missing, fall back to (29,30).
"""
if lm is None: return None
# World landmarks have 3D tuples; image landmarks have [x,y,vis]
def ok(pt):
if not pt: return False
if want_world:
return len(pt) >= 3 and all(np.isfinite(pt[:3]))
else:
return len(pt) >= 3 and pt[2] >= 0.4 and np.isfinite(pt[0]) and np.isfinite(pt[1])
# Prefer FOOT_INDEX (toes)
l_idx, r_idx = 31, 32
if (len(lm) > r_idx) and ok(lm[l_idx]) and ok(lm[r_idx]):
return (l_idx, r_idx)
# Fall back to HEEL
l_idx, r_idx = 29, 30
if (len(lm) > r_idx) and ok(lm[l_idx]) and ok(lm[r_idx]):
return (l_idx, r_idx)
return None
def _toe_line_unit_img(lm, H, W, roll_deg):
"""
2D: return unit toe-line vector in the de-rolled image frame.
Uses best available (31,32) or (29,30). Returns None if unavailable.
"""
pair = _best_foot_pair(lm, want_world=False)
if not pair: return None
iL, iR = pair
dx = (lm[iR][0] - lm[iL][0]) * (W if max(abs(lm[iR][0]-lm[iL][0]), abs(lm[iR][1]-lm[iL][1])) <= 2.0 else 1.0)
dy = (lm[iR][1] - lm[iL][1]) * (H if max(abs(lm[iR][0]-lm[iL][0]), abs(lm[iR][1]-lm[iL][1])) <= 2.0 else 1.0)
cr, sr = math.cos(math.radians(roll_deg)), math.sin(math.radians(roll_deg))
tx, ty = (cr*dx + sr*dy, -sr*dx + cr*dy) # de-rolled
n = math.hypot(tx, ty)
if n < 1e-6: return None
return (tx/n, ty/n)
def _toe_line_unit_world(wf):
"""
3D/XZ: return unit toe-line vector (R-L) in XZ. Prefers (31,32), else (29,30).
"""
pair = _best_foot_pair(wf, want_world=True)
if not pair: return None
iL, iR = pair
vx = wf[iR][0] - wf[iL][0]
vz = wf[iR][2] - wf[iL][2]
n = math.hypot(vx, vz)
if n < 1e-6: return None
return (vx/n, vz/n)
# Core math utilities
def wrap180(a):
return (a + 180.0) % 360.0 - 180.0
def unit2(v):
"""Normalize 2D vector"""
n = (v[0]*v[0] + v[1]*v[1])**0.5
return (v[0]/n, v[1]/n) if n > 1e-6 else (0.0, 1.0)
def signed_angle2(u, v):
"""Signed angle between two 2D vectors in degrees (-180, 180]"""
dot = u[0]*v[0] + u[1]*v[1]
det = u[0]*v[1] - u[1]*v[0]
return math.degrees(math.atan2(det, dot))
def _eye_roll_deg(lx, ly, rx, ry):
"""Calculate angle of the line from right eye to left eye"""
return math.degrees(math.atan2(ly - ry, lx - rx))
def _midpt(a, b):
"""Calculate midpoint of two 2D points"""
return ((a[0] + b[0]) * 0.5, (a[1] + b[1]) * 0.5)
def smooth_landmarks(pose_data: Dict[int, List], alpha: float = 0.35) -> Dict[int, List]:
"""Apply exponential moving average smoothing to landmarks"""
if not pose_data:
return pose_data
frame_indices = sorted(pose_data.keys())
smoothed_data = {}
for frame_idx in frame_indices:
landmarks = pose_data[frame_idx]
if not landmarks or not isinstance(landmarks, list):
continue
smoothed_landmarks = []
for i, lm in enumerate(landmarks):
if lm and isinstance(lm, list) and len(lm) >= 3:
x, y, vis = lm[0], lm[1], lm[2]
# Get previous smoothed values
if frame_idx > 0 and frame_idx - 1 in smoothed_data:
prev_lms = smoothed_data[frame_idx - 1]
if (i < len(prev_lms) and prev_lms[i] and
isinstance(prev_lms[i], list) and len(prev_lms[i]) >= 3):
px, py, pv = prev_lms[i][0], prev_lms[i][1], prev_lms[i][2]
x = alpha * x + (1 - alpha) * px
y = alpha * y + (1 - alpha) * py
vis = alpha * vis + (1 - alpha) * pv
smoothed_landmarks.append([x, y, vis])
else:
smoothed_landmarks.append(lm)
smoothed_data[frame_idx] = smoothed_landmarks
return smoothed_data
def has_world_ok_scale(world_landmarks, frame_idx=None):
"""Check if world landmarks have reasonable scale (relaxed ranges for hip/shoulder span)"""
if not world_landmarks:
return False
frames_to_check = [frame_idx] if frame_idx is not None else list(world_landmarks.keys())[:5]
for f in frames_to_check:
if f not in world_landmarks:
continue
wf = world_landmarks[f]
if not wf or len(wf) < 25:
continue
# Check hip span (relaxed range: 0.20-0.70m)
if wf[23] and wf[24]: # L_HIP, R_HIP
hip_span = math.hypot(wf[24][0] - wf[23][0], wf[24][2] - wf[23][2])
if 0.20 <= hip_span <= 0.70:
dbg(f"[SCALE] Hip span {hip_span:.3f}m is reasonable")
return True
# Check shoulder span (relaxed range: 0.25-0.70m)
if wf[11] and wf[12]: # L_SHO, R_SHO
sho_span = math.hypot(wf[12][0] - wf[11][0], wf[12][2] - wf[11][2])
if 0.25 <= sho_span <= 0.70:
dbg(f"[SCALE] Shoulder span {sho_span:.3f}m is reasonable")
return True
# Debug why scale check failed
if frames_to_check:
f = frames_to_check[0]
if f in world_landmarks:
wf = world_landmarks[f]
if wf and len(wf) >= 25:
if wf[23] and wf[24]:
hip_span = math.hypot(wf[24][0] - wf[23][0], wf[24][2] - wf[23][2])
dbg(f"[SCALE] Hip span {hip_span:.3f}m out of range [0.20, 0.70]")
if wf[11] and wf[12]:
sho_span = math.hypot(wf[12][0] - wf[11][0], wf[12][2] - wf[11][2])
dbg(f"[SCALE] Shoulder span {sho_span:.3f}m out of range [0.25, 0.70]")
return False
def unit3(v):
"""Normalize 3D vector"""
n = math.sqrt(v[0]*v[0] + v[1]*v[1] + v[2]*v[2])
return np.array(v) / n if n > 1e-6 else np.array([0.0, 0.0, 1.0])
def world_vec(world_landmarks, frame_idx, landmark_name):
"""Get world vector for landmark"""
landmark_map = {'C7': 0, 'L_HIP': 23, 'R_HIP': 24} # C7 approximated as nose for now
if landmark_name == 'C7':
# Approximate C7 as midpoint between shoulders raised slightly
if (frame_idx in world_landmarks and len(world_landmarks[frame_idx]) > 12 and
world_landmarks[frame_idx][11] and world_landmarks[frame_idx][12]):
l_sho = np.array(world_landmarks[frame_idx][11][:3])
r_sho = np.array(world_landmarks[frame_idx][12][:3])
# Raise midpoint slightly to approximate C7
c7_approx = (l_sho + r_sho) / 2 + np.array([0, 0.08, 0]) # 8cm higher
return c7_approx
idx = landmark_map.get(landmark_name)
if idx is None or frame_idx not in world_landmarks:
return None
wf = world_landmarks[frame_idx]
if not wf or len(wf) <= idx or not wf[idx]:
return None
return np.array(wf[idx][:3])
def any_nan(arr):
"""Check if array contains any NaN values"""
return np.any(np.isnan(arr)) if arr is not None else True
def calibrate_sidebend_sign(world_landmarks, impact_idx, handedness, fwd_hat_3d):
"""
Returns +1 or -1: multiplier to convert 2D dx sign into 'away-from-target = +'
Uses short window around impact to vote with 3D sign, falls back to handed rule.
"""
if not has_world_ok_scale(world_landmarks):
return 1 if handedness == 'right' else -1
W = range(max(0, impact_idx-2), min(len(list(world_landmarks.keys())), impact_idx+3))
votes = 0
for i in W:
if i not in world_landmarks:
continue
c7 = world_vec(world_landmarks, i, 'C7')
l_hip = world_vec(world_landmarks, i, 'L_HIP')
r_hip = world_vec(world_landmarks, i, 'R_HIP')
if c7 is None or l_hip is None or r_hip is None:
continue
hip = (l_hip + r_hip) * 0.5
spine = unit3(c7 - hip)
# lateral-right unit on ground plane from forward
lat = unit3(np.cross([0,1,0], fwd_hat_3d))
s3d = np.sign(np.dot(spine, lat)) # + means leaning to player's right
votes += s3d
# majority 3D says which side is player-right; convert to 'away from target = +'
# For RH, away-from-target == player-right; for LH, opposite.
prefer_player_right_positive = np.sign(votes) if votes != 0 else 1
return prefer_player_right_positive if handedness == 'right' else -prefer_player_right_positive
def calculate_torso_sidebend_at_impact(pose_data, swing_phases, frames=None, image_shape=None, handedness='right', world_landmarks=None):
"""
Front-facing: torso side-bend in degrees at impact.
Defined as angle between (neck midpoint → hip midpoint) and screen-vertical,
computed in derolled screen space.
"""
dbg(f"[SB] handed={handedness}")
# --- Resolve impact frame (with a tiny +δ tolerance) ---
impact_frames = swing_phases.get("impact", [])
if impact_frames:
impact_idx = int(round(impact_frames[0]))
else:
# Fallback: find impact using peak hand speed if no swing phases provided
impact_idx = _find_impact_frame(pose_data, world_landmarks)
if impact_idx is None:
impact_idx = sorted(pose_data.keys())[-1] if pose_data else None
# Surgical asserts to guarantee true impact frame
assert impact_idx is not None, "[SB] impact_idx None (missing world_landmarks pass-thru?)"
n = len(pose_data["landmarks"]) if "landmarks" in pose_data else len(pose_data)
assert impact_idx < n-1, f"[SB] Using last frame ({impact_idx}/{n-1}) — not true impact"
dbg(f"[SB] impact_idx={impact_idx} of {n} frames (OK)")
if impact_idx is None:
dbg("[TORSO] No impact frame found")
return {"metric": "torso_side_bend_deg", "value": None, "quality": "no-data"}
num_frames = len(pose_data["landmarks"]) if "landmarks" in pose_data else len(pose_data)
# TEMPORAL SMOOTHING: Use ±2-frame window around impact for better timing
candidates = [impact_idx - 2, impact_idx - 1, impact_idx, impact_idx + 1, impact_idx + 2]
candidates = [i for i in candidates if 0 <= i < num_frames and i in pose_data]
# --- Improved de-roll using eyes + hips (not eyes-only) to reduce drift ---
lo = max(0, impact_idx - 4)
hi = min(num_frames - 1, impact_idx + 5)
combined_rolls = []
for i in range(lo, hi + 1):
if i not in pose_data:
continue
L_EYE, R_EYE = 2, 5
L_HIP, R_HIP = 23, 24
lm = pose_data[i]
if not lm or len(lm) <= max(R_EYE, R_HIP):
continue
# Collect both eye and hip roll angles
frame_rolls = []
# Eye roll
le, re = lm[L_EYE], lm[R_EYE]
if le is not None and re is not None:
(lx, ly), (rx, ry) = le[:2], re[:2]
frame_rolls.append(_eye_roll_deg(lx, ly, rx, ry))
# Hip roll
lh, rh = lm[L_HIP], lm[R_HIP]
if lh is not None and rh is not None:
(lx, ly), (rx, ry) = lh[:2], rh[:2]
frame_rolls.append(_eye_roll_deg(lx, ly, rx, ry))
# Use median of available rolls for this frame
if frame_rolls:
combined_rolls.append(np.median(frame_rolls))
roll_deg = 0.0
if combined_rolls:
arr = np.array(combined_rolls, dtype=float)
lo_q, hi_q = np.percentile(arr, [15, 85])
arr = arr[(arr >= lo_q) & (arr <= hi_q)]
roll_deg = float(np.median(np.clip(arr, -25.0, 25.0)))
dbg(f"[TORSO] eyes+hips roll (trimmed median) = {roll_deg:.1f}°")
# --- Choose best candidate (prefer middle of temporal window) ---
valid_candidates = []
L_SHO, R_SHO, L_HIP, R_HIP = 11, 12, 23, 24
for i in candidates:
lm = pose_data[i]
if not lm or len(lm) <= max(L_SHO, R_SHO, L_HIP, R_HIP):
continue
ls = lm[L_SHO]
rs = lm[R_SHO]
lh = lm[L_HIP]
rh = lm[R_HIP]
if None not in (ls, rs, lh, rh):
valid_candidates.append(i)
if not valid_candidates:
dbg("[TORSO] No valid landmarks around impact")
return {"metric": "torso_side_bend_deg", "value": None, "quality": "no-data"}
# TEMPORAL MEDIAN: Calculate side-bend from all valid frames for stability
def _get_HW():
if frames:
H, W = frames[0].shape[:2]; return float(H), float(W)
if image_shape and len(image_shape) >= 2:
H, W = image_shape[:2]; return float(H), float(W)
return None, None
H, W = _get_HW()
cx, cy = (W / 2.0 if W else 0.0), (H / 2.0 if H else 0.0)
rad = math.radians(-roll_deg)
cr, sr = math.cos(rad), math.sin(rad)
def _deroll(pt):
x, y = pt[:2] # Handle both [x,y] and [x,y,vis] formats
x0, y0 = x - cx, y - cy
xr = cr * x0 - sr * y0 + cx
yr = sr * x0 + cr * y0 + cy
return (xr, yr)
# Calculate side-bend for all valid frames and take median
L_SHO, R_SHO, L_HIP, R_HIP = 11, 12, 23, 24
sidebend_values = []
for frame_idx in valid_candidates:
lm = pose_data[frame_idx]
ls = _deroll(lm[L_SHO])
rs = _deroll(lm[R_SHO])
lh = _deroll(lm[L_HIP])
rh = _deroll(lm[R_HIP])
neck = _midpt(ls, rs)
midhip = _midpt(lh, rh)
dx = neck[0] - midhip[0]
dy = neck[1] - midhip[1] # y grows downward
# 2D frontal-plane magnitude only
dy = abs(dy) + 1e-6 # Ensure positive denominator
sidebend_abs_frame = math.degrees(math.atan2(abs(dx), dy))
sidebend_values.append((dx, sidebend_abs_frame)) # Store both dx and magnitude
# Use median magnitude and median dx sign
if sidebend_values:
dx_values = [sv[0] for sv in sidebend_values]
abs_values = [sv[1] for sv in sidebend_values]
dx = np.median(dx_values) # Median dx for sign
sidebend_abs = np.median(abs_values) # Median magnitude
chosen = valid_candidates[len(valid_candidates)//2] # For debugging reference
else:
return {"metric": "torso_side_bend_deg", "value": None, "quality": "no-data"}
# Get forward direction for sign calibration
fwd_hat_3d = None
if world_landmarks is not None:
setup_frames = swing_phases.get("setup", [])[:5] or sorted(world_landmarks.keys())[:5]
for f in setup_frames:
if f in world_landmarks:
toe_vec_3d = _toe_line_unit_world(world_landmarks[f])
if toe_vec_3d is not None:
# Forward perpendicular to toe line in XZ plane
fwd_hat_3d = np.array([-toe_vec_3d[1], 0, toe_vec_3d[0]])
break
# Standardized sign convention: positive toward target for right-handed
sign_2d_raw = 1.0 if dx > 0 else -1.0
cal = calibrate_sidebend_sign(world_landmarks, impact_idx, handedness, fwd_hat_3d)
# Apply standardized sign convention: positive = away from target
# For right-handed: away from target = toward player's right
# For left-handed: away from target = toward player's left
final_sign = sign_2d_raw * cal
sidebend_deg = final_sign * sidebend_abs
# Guardrails: clip extreme values but warn
if abs(sidebend_deg) > 30:
dbg(f"[SB] rejecting |sb|={abs(sidebend_deg):.1f}° > 30°, check roll/coords")
sidebend_deg = math.copysign(30.0, sidebend_deg)
# Debug output matching expected format
dbg(f"[SB] 2D mag={sidebend_abs:.1f}°, cal={cal:+.0f}, final={sidebend_deg:.1f}°")
dbg(f"[TORSO] side-bend = {sidebend_deg:.1f}° (dx={dx:.1f}, dy={abs(dy):.1f}, abs={sidebend_abs:.1f}°)")
# Optional: shoulder drop % of head height (kept because it's useful)
head_h = pose_data.get("head_h", {}).get(chosen)
drop_pct = None
if head_h and head_h > 0:
# lead vs trail by handedness; y increases downward in image coords
lead_y = ls[1] if handedness == 'right' else rs[1]
trail_y = rs[1] if handedness == 'right' else ls[1]
dy_sho = (lead_y - trail_y)
drop_pct = 100.0 * (dy_sho / head_h)
dbg(f"[TORSO] lead-shoulder drop = {drop_pct:.1f}% head")
# Trust but verify debug printout
dbg(f"[SB] final torso_sidebend={sidebend_deg:.1f}° at frame={chosen}")
return {
"metric": "torso_side_bend_deg",
"value": sidebend_deg,
"quality": "ok",
"details": {
"frame": chosen,
"roll_used_deg": roll_deg,
"shoulder_drop_pct_head": drop_pct
}
}
def calculate_shoulder_tilt_at_impact(*args, **kwargs):
"""
DEPRECATED: Shoulder tilt removed due to occlusion instability.
Returns torso side-bend instead for backward compatibility.
"""
dbg("[DEPRECATION] Shoulder tilt removed. Returning torso side-bend instead.")
res = calculate_torso_sidebend_at_impact(*args, **kwargs)
# keep old key for downstream consumers if needed:
return res["value"] # Return just the numeric value for compatibility
def _find_impact_frame(pose_data, world_landmarks=None):
"""Find impact frame using peak hand speed if not provided in swing phases"""
if not world_landmarks:
return None
frames = sorted(world_landmarks.keys())
if len(frames) < 3:
return None
speeds = []
for i in range(1, len(frames)):
f_curr = frames[i]
f_prev = frames[i-1]
curr_frame = world_landmarks.get(f_curr)
prev_frame = world_landmarks.get(f_prev)
if (curr_frame and len(curr_frame) > 16 and
prev_frame and len(prev_frame) > 16 and
curr_frame[16] and prev_frame[16]): # Right wrist
curr_wrist = curr_frame[16]
prev_wrist = prev_frame[16]
# Calculate 3D speed
dx = curr_wrist[0] - prev_wrist[0]
dy = curr_wrist[1] - prev_wrist[1]
dz = curr_wrist[2] - prev_wrist[2]
speed = math.sqrt(dx*dx + dy*dy + dz*dz)
speeds.append((f_curr, speed))
if speeds:
# Find peak speed frame
peak_frame = max(speeds, key=lambda x: x[1])[0]
# Impact is typically 1 frame before peak speed
impact_frame = peak_frame - 1
return impact_frame if impact_frame in frames else peak_frame
return None
def _target_hat_from_toes_world(world_landmarks, setup_frames, handedness='right'):
"""Build stable target axis from toe line at address with outlier trimming"""
LEFT_TOE, RIGHT_TOE = 31, 32
toe_vecs = []
for f in setup_frames:
if f in world_landmarks:
frame = world_landmarks[f]
if frame and len(frame) > 32:
try:
Ltoe = frame[LEFT_TOE]
Rtoe = frame[RIGHT_TOE]
if Ltoe and Rtoe and len(Ltoe) >= 3 and len(Rtoe) >= 3:
toe_vec = (Rtoe[0]-Ltoe[0], Rtoe[2]-Ltoe[2])
if (toe_vec[0]**2 + toe_vec[1]**2) > 1e-6:
toe_vecs.append(toe_vec)
except Exception:
continue
if not toe_vecs:
dbg("target_axis - NO VALID TOE VECTORS, using default")
return (1.0, 0.0)
# Drop 20% widest-angle outliers before median
toe_vecs = np.array(toe_vecs, dtype=float)
angles = np.degrees(np.arctan2(toe_vecs[:,1], toe_vecs[:,0]))
med, dev = np.median(angles), np.abs(angles - np.median(angles))
keep = dev <= np.percentile(dev, 80)
target_hat = unit2(np.median(toe_vecs[keep], axis=0))
# Optional: flip sign for consistency if you want +X toward target for left-handed
if handedness == 'left':
target_hat = (-target_hat[0], -target_hat[1])
return target_hat
def _pick_stable_setup_frames(sm, swing_phases, max_frames=6):
"""Pick first address frames with low pelvis motion (reduces waggle bias)."""
cand = swing_phases.get("setup", [])[:12]
if not cand:
return []
def pelvis_ctr_px(f):
lm = sm.get(f)
if not lm or not lm[23] or not lm[24]: return None
return np.array([(lm[23][0]+lm[24][0])*0.5, (lm[23][1]+lm[24][1])*0.5], float)
centers = [(f, pelvis_ctr_px(f)) for f in cand]
centers = [(f, c) for f,c in centers if c is not None]
if len(centers) < 2: return [f for f,_ in centers][:max_frames]
# speed threshold: keep earliest frames whose step-to-step pelvis motion is small
speeds = []
for i in range(1, len(centers)):
f0,c0 = centers[i-1]; f1,c1 = centers[i]
speeds.append((f1, float(np.linalg.norm(c1-c0))))
if not speeds: return [centers[0][0]]
# pick prefix until motion inflates (waggle begins)
thr = max(1.5, np.median([s for _,s in speeds])*2.5)
keep = [centers[0][0]]
for f,s in speeds:
if s <= thr and len(keep) < max_frames:
keep.append(f)
else:
break
return keep
def _build_foot_only_homography(sm, setup_frames):
"""
Build ground-plane H from 4 foot points only (no hips).
MediaPipe indices: 27=L_heel, 28=R_heel, 29=L_ankle, 30=R_ankle, 31=L_foot_index, 32=R_foot_index
Image pts use (31,32) for toes or (29,30) for ankles as fallback.
Ground pts are a rectangle in arbitrary inches; scale will be calibrated later.
"""
import cv2
for f in setup_frames:
lm = sm.get(f)
if not lm: continue
needed = [29,30,31,32]
if any((i>=len(lm) or lm[i] is None or lm[i][2] < 0.5) for i in needed):
continue
# Image points - use consistent ankle/toe mapping
img = np.array([
[lm[29][0], lm[29][1]], # L_ankle -> treat as heel
[lm[30][0], lm[30][1]], # R_ankle -> heel
[lm[31][0], lm[31][1]], # L_foot_index -> toe
[lm[32][0], lm[32][1]], # R_foot_index -> toe
], dtype=np.float32)
# Ground rectangle (left/right must match the image ordering above)
ground = np.array([
[-8.0, 0.0], # L heel
[ 8.0, 0.0], # R heel
[-8.0, 11.0], # L toe
[ 8.0, 11.0], # R toe
], dtype=np.float32)
H, mask = cv2.findHomography(img, ground, cv2.RANSAC, 2.0)
if H is None:
continue
det = abs(np.linalg.det(H[:2,:2]))
if det > 1e-5:
return H, True, f
return None, False, None
def _simple_pixel_sway_fallback(sm, setup_frames, top_frame, swing_phases=None, world_landmarks=None):
"""Improved pixel-based sway fallback with world landmark scale calibration"""
def get_lm(frame_idx):
return sm.get(frame_idx)
# pick a setup frame that has feet + hips
setup_ok = [f for f in setup_frames
if get_lm(f) and get_lm(f)[23] and get_lm(f)[24] and get_lm(f)[31] and get_lm(f)[32]]
if not setup_ok:
return 0.0
f0 = setup_ok[0]
lm0 = get_lm(f0)
# pelvis centers (pixels)
def pelvis_ctr_px(f):
lm = get_lm(f)
if not lm or not lm[23] or not lm[24]:
return None
return np.array([(lm[23][0] + lm[24][0]) * 0.5,
(lm[23][1] + lm[24][1]) * 0.5], dtype=float)
setup_ctrs = [pelvis_ctr_px(f) for f in setup_ok]
setup_ctrs = [c for c in setup_ctrs if c is not None]
if not setup_ctrs:
return 0.0
setup_ctr = np.median(np.stack(setup_ctrs), axis=0)
top_ctr = pelvis_ctr_px(top_frame)
if top_ctr is None:
return 0.0
# toe vector in pixels from the same setup frame
toe_v = np.array([lm0[32][0] - lm0[31][0],
lm0[32][1] - lm0[31][1]], dtype=float)
n = np.linalg.norm(toe_v)
if n < 1e-6:
# fallback to horizontal approximation if toes are missing
toe_hat_px = np.array([1.0, 0.0])
else:
toe_hat_px = toe_v / n
# Target axis (toe line itself) in pixel space
target_hat_px = toe_hat_px / (np.linalg.norm(toe_hat_px) + 1e-9)
# --- NEW: sign disambiguation using address -> impact pelvis centers in pixel space ---
if swing_phases and swing_phases.get("impact"):
f_imp = swing_phases["impact"][0]
# median address pelvis center in px
addr_ctrs = [pelvis_ctr_px(f) for f in setup_ok]
addr_ctrs = [c for c in addr_ctrs if c is not None]
imp_ctr = pelvis_ctr_px(f_imp)
if addr_ctrs and imp_ctr is not None:
addr_ctr = np.median(np.stack(addr_ctrs), axis=0)
fwd_vec_px = imp_ctr - addr_ctr
if float(np.dot(fwd_vec_px, target_hat_px)) < 0.0:
dbg("[SWAY] Flipped target_hat_px based on addr→impact motion")
target_hat_px = -target_hat_px
delta_px = top_ctr - setup_ctr
sway_px = float(np.dot(delta_px, target_hat_px))
# Improved px→inch scale using world landmarks with consistency check
px_per_inch_candidates = []
default_px_per_inch = 10.0
if world_landmarks is not None:
# Try to get px_per_inch from multiple setup frames
for f in setup_ok:
if f in world_landmarks:
try:
wf = world_landmarks[f]
lm = get_lm(f)
L_FOOT_INDEX, R_FOOT_INDEX = 31, 32 # foot_index points (toe tips)
Lt, Rt = wf[L_FOOT_INDEX], wf[R_FOOT_INDEX]
if Lt and Rt and lm and lm[31] and lm[32]:
# 3D toe spacing in meters → inches
d_m = math.hypot(Rt[0]-Lt[0], Rt[2]-Lt[2])
d_in_true = d_m * 39.3701
# Pixel toe spacing for this frame
toe_v_frame = np.array([lm[32][0] - lm[31][0], lm[32][1] - lm[31][1]], dtype=float)
d_px = float(np.linalg.norm(toe_v_frame))
if d_px > 1e-3 and d_in_true > 1e-3:
candidate = d_px / d_in_true
# Sanity check: reasonable range for px_per_inch (5-25 depending on resolution/crop)
if 5.0 <= candidate <= 25.0:
px_per_inch_candidates.append(candidate)
dbg(f"pixel_fallback: frame {f} px_per_inch={candidate:.2f}")
except Exception:
continue
# Apply consistency check: reject frames with >10% deviation from median
if len(px_per_inch_candidates) > 1:
initial_median = np.median(px_per_inch_candidates)
consistent_candidates = []
for val in px_per_inch_candidates:
deviation_pct = abs(val - initial_median) / initial_median * 100
if deviation_pct <= 10.0: # Within 10% of median
consistent_candidates.append(val)
else:
dbg(f"pixel_fallback: rejecting inconsistent px_per_inch={val:.2f} (deviation={deviation_pct:.1f}%)")
px_per_inch_candidates = consistent_candidates
# Use median of candidates if available, otherwise use default
if px_per_inch_candidates:
px_per_inch = float(np.median(px_per_inch_candidates))
dbg(f"pixel_fallback: median px_per_inch={px_per_inch:.2f} from {len(px_per_inch_candidates)} consistent frames")
else:
px_per_inch = default_px_per_inch
dbg(f"pixel_fallback: using default px_per_inch={px_per_inch:.2f}")
sway_inches = sway_px / px_per_inch
# Apply same robust baseline approach to pixel fallback
if len(setup_ctrs) >= 2:
setup_x_px = [c[0] for c in setup_ctrs]
t = np.arange(len(setup_x_px))
m, b = np.polyfit(t, setup_x_px, 1) if len(setup_x_px) >= 2 else (0, setup_x_px[0])
residuals = np.array(setup_x_px) - (m * t + b)
mad_residuals = np.median(np.abs(residuals - np.median(residuals)))
jitter_px = 1.4826 * mad_residuals
jitter_in = jitter_px / px_per_inch
else:
jitter_in = 0.1 # Minimal fallback
# Calculate trust ratio
sway_jitter_ratio = abs(sway_inches) / max(jitter_in, 1e-6)
trust_flag = "trusted" if sway_jitter_ratio >= 1.5 else "low-trust"
# Normalize by stance width if using pixel fallback and world landmarks available
normalized_sway = None
stance_width_in = None
if world_landmarks:
stance_widths = []
for f in setup_ok:
if f in world_landmarks:
try:
Lt_w, Rt_w = world_landmarks[f][31], world_landmarks[f][32]
toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3]))
stance_widths.append(float(toe_world_m * 39.3701)) # Convert to inches
except:
pass
if stance_widths:
stance_width_in = np.median(stance_widths)
if 14 <= stance_width_in <= 28:
normalized_sway = sway_inches / stance_width_in
# Enhanced pixel fallback logging matching main function
norm_str = f"{normalized_sway:.3f}" if normalized_sway is not None else "NA"
dbg(f"pixel_fallback: dx={sway_inches:.2f}in, norm={norm_str}, jitter={jitter_in:.2f}in, ratio={sway_jitter_ratio:.1f}, trust={trust_flag}")
if stance_width_in:
dbg(f"pixel_fallback: stance_width={stance_width_in:.1f}\", target_normalized≈0.16 for pros")
return sway_inches
def _subframe_impact_refinement(world_landmarks: Dict[int, List], impact_candidate: int) -> float:
"""
Refine impact timing to sub-frame precision using parabolic fit to hand speed
Fits parabola around the impact candidate and returns fractional frame timestamp
"""
try:
# Get frames around impact candidate for parabolic fit
test_frames = [f for f in range(impact_candidate-2, impact_candidate+3)
if f in world_landmarks]
if len(test_frames) < 3:
return float(impact_candidate) # Not enough data for refinement
# Calculate hand speeds around impact
speeds = []
frame_indices = []
for i in range(1, len(test_frames)):
f_curr = test_frames[i]
f_prev = test_frames[i-1]
curr_frame = world_landmarks[f_curr]
prev_frame = world_landmarks[f_prev]
if (curr_frame and len(curr_frame) > 16 and
prev_frame and len(prev_frame) > 16 and
curr_frame[16] and prev_frame[16]):
curr_wrist = curr_frame[16]
prev_wrist = prev_frame[16]
# Calculate 3D speed
dx = curr_wrist[0] - prev_wrist[0]
dy = curr_wrist[1] - prev_wrist[1]
dz = curr_wrist[2] - prev_wrist[2]
speed = math.sqrt(dx*dx + dy*dy + dz*dz)
speeds.append(speed)
frame_indices.append(f_curr)
if len(speeds) < 3:
return float(impact_candidate)
# Fit parabola to speed curve: speed = a*t^2 + b*t + c
# Find peak (maximum) of parabola
t = np.array(frame_indices, dtype=float)
s = np.array(speeds)
# Polynomial fit (degree 2)
coeffs = np.polyfit(t, s, 2)
a, b, c = coeffs
if a >= 0: # Parabola opens upward → no maximum
return float(impact_candidate)
# Maximum at t = -b/(2a)
peak_frame_fractional = -b / (2*a)
# Impact happens just before peak (typically 0.5-1.0 frames before)
# Use the original logic of peak-1, but with fractional precision
impact_fractional = peak_frame_fractional - 0.7 # Refined offset
# Constrain to reasonable range around original estimate
if abs(impact_fractional - impact_candidate) > 2.0:
impact_fractional = float(impact_candidate)
dbg(f"subframe_refinement - parabola_coeffs=[{a:.6f}, {b:.6f}, {c:.6f}]")
dbg(f"subframe_refinement - peak_frame={peak_frame_fractional:.2f}, impact_refined={impact_fractional:.2f}")
return impact_fractional
except Exception as e:
dbg(f"subframe_refinement - failed: {e}, using integer frame")
return float(impact_candidate)
def _address_frames(swing_phases: Dict[str, List]) -> List[int]:
"""Get address/setup frames"""
return swing_phases.get("setup", [])[:10]
def _impact_frame(world_landmarks: Dict[int, List], swing_phases: Dict[str, List]) -> Optional[int]:
"""Find impact using peak lead-hand speed - 1 frame with inline refinement"""
downswing_frames = swing_phases.get("downswing", [])
impact_frames = swing_phases.get("impact", [])
analysis_frames = sorted(downswing_frames + impact_frames)
if len(analysis_frames) < 5:
return impact_frames[0] if impact_frames else None
# Calculate hand speeds
hand_speeds = []
valid_frames = []
for i in range(1, len(analysis_frames)):
f_curr = analysis_frames[i]
f_prev = analysis_frames[i-1]
if f_curr in world_landmarks and f_prev in world_landmarks:
curr_frame = world_landmarks[f_curr]
prev_frame = world_landmarks[f_prev]
if (curr_frame and len(curr_frame) > 16 and
prev_frame and len(prev_frame) > 16):
# Use lead hand (left wrist for right-handed golfer)
wrist_idx = 15 # Lead wrist for right-handed
curr_wrist = curr_frame[wrist_idx]
prev_wrist = prev_frame[wrist_idx]
if (curr_wrist and len(curr_wrist) >= 3 and
prev_wrist and len(prev_wrist) >= 3):
dx = curr_wrist[0] - prev_wrist[0]
dy = curr_wrist[1] - prev_wrist[1]
dz = curr_wrist[2] - prev_wrist[2]
speed = math.sqrt(dx*dx + dy*dy + dz*dz)
hand_speeds.append(speed)
valid_frames.append(f_curr)
if len(hand_speeds) >= 3:
# Find peak speed frame
max_speed_idx = np.argmax(hand_speeds)
peak_speed_frame = valid_frames[max_speed_idx]
# Initial estimate: peak speed - 1 frame
impact_candidate = peak_speed_frame
if max_speed_idx > 0:
impact_candidate = valid_frames[max_speed_idx - 1]
# Clamp to ±2 frames around original estimate
original = valid_frames[max_speed_idx - 1] if max_speed_idx > 0 else peak_speed_frame
impact_candidate = max(original - 2, min(original + 2, impact_candidate))
return impact_candidate
return impact_frames[0] if impact_frames else None
# Removed complex 2D fallback function - using simple direct measurement only
# ================== HIP-SHOULDER SEPARATION METRIC ==================
def _angle_deg(p_left: np.ndarray, p_right: np.ndarray) -> float:
"""
Angle of the line Left->Right relative to screen-horizontal, in degrees.
Image coords: x right+, y down+.
"""
dx = float(p_right[0] - p_left[0])
dy = float(p_right[1] - p_left[1])
return np.degrees(np.arctan2(dy, dx))
def _wrap_deg(a: float) -> float:
"""Wrap to (-180, 180]."""
a = (a + 180.0) % 360.0 - 180.0
return a
def _span(p_left: np.ndarray, p_right: np.ndarray) -> float:
"""Euclidean span in pixels."""
return float(np.hypot(p_right[0] - p_left[0], p_right[1] - p_left[1]))
def _trimmed_median(values: List[float], trim: float = 0.2) -> float:
"""Robust center estimate."""
if not values:
return 0.0
arr = np.sort(np.asarray(values, dtype=float))
n = len(arr)
k = int(np.floor(trim * n))
arr = arr[k: n - k] if n - 2*k > 0 else arr
return float(np.median(arr))
def _circular_mean_deg(values: List[float]) -> float:
"""Circular mean for angle values to avoid wrap artifacts."""
if not values:
return 0.0
a = np.radians(values)
return float(np.degrees(np.arctan2(np.mean(np.sin(a)), np.mean(np.cos(a)))))
def estimate_global_roll_deg(
lm_seq: np.ndarray,
setup_idxs: List[int],
use_eyes: bool = True
) -> float:
"""
Estimate camera roll from early setup frames using eyes + shoulders only.
Uses circular mean to avoid wrap artifacts. Excludes hips to prevent bias from pelvis rotation.
Returns a single roll angle in degrees (to subtract from later line angles).
"""
eye_angles, sho_angles = [], []
for t in setup_idxs:
if t >= lm_seq.shape[0]:
continue
lm = lm_seq[t]
try:
if use_eyes and len(lm) > max(L_EYE, R_EYE):
eye_angles.append(_angle_deg(lm[L_EYE], lm[R_EYE]))
if len(lm) > max(L_SHO, R_SHO):
sho_angles.append(_angle_deg(lm[L_SHO], lm[R_SHO]))
except Exception:
continue
# Combine all angles for circular mean (exclude hips to avoid pelvis bias)
all_angles = []
if eye_angles: all_angles.extend(eye_angles)
if sho_angles: all_angles.extend(sho_angles)
if not all_angles:
return 0.0
# Use circular mean to avoid wrap artifacts like 171° instead of -18°
return _circular_mean_deg(all_angles)
def hip_shoulder_separation_deg(
lm_seq: np.ndarray,
idx: int,
setup_idxs: List[int],
handed: str = "R",
min_span_px: float = 20.0,
smooth_half_window: int = 2, # 5-frame median around idx
apply_yaw_correction: bool = True
) -> Dict[str, float]:
"""
Compute signed hip-shoulder separation at a given frame.
Positive (right-handed) => hips more open than shoulders (hips leading).
For left-handed, sign is flipped so positive still means hips leading.
New features:
- Temporal smoothing: Uses median over ±smooth_half_window frames around idx
- Yaw compensation: Corrects for camera angle by comparing shoulder spans
- Geometry confidence: Flags when foreshortening may affect accuracy
Returns: dict with:
'hip_shoulder_sep_deg': Corrected separation angle
'hip_shoulder_sep_abs_deg': Absolute value of separation
'confidence': 0-1 quality of landmark detection
'span_ratio': Shoulder span at idx vs address (for foreshortening check)
'geometry_ok': True if camera appears square to player
"""
T = lm_seq.shape[0]
# Neighborhood for temporal median smoothing
lo = max(0, idx - smooth_half_window)
hi = min(T - 1, idx + smooth_half_window)
window = range(lo, hi + 1)
# Estimate roll from setup
roll_deg = estimate_global_roll_deg(lm_seq, setup_idxs, use_eyes=True)
hip_angles, sho_angles, confs = [], [], []
for t in window:
if t >= T:
continue
lm = lm_seq[t]
if len(lm) <= max(L_SHO, R_SHO, L_HIP, R_HIP):
continue
Ls, Rs = lm[L_SHO], lm[R_SHO]
Lh, Rh = lm[L_HIP], lm[R_HIP]
# Confidence via spans
span_sho = _span(Ls, Rs)
span_hip = _span(Lh, Rh)
good = float((span_sho >= min_span_px) and (span_hip >= min_span_px))
confs.append(good)
# Raw angles
a_sho = _angle_deg(Ls, Rs) - roll_deg
a_hip = _angle_deg(Lh, Rh) - roll_deg
# Wrap both post de-roll
a_sho = _wrap_deg(a_sho)
a_hip = _wrap_deg(a_hip)
sho_angles.append(a_sho)
hip_angles.append(a_hip)
if not sho_angles or not hip_angles:
return dict(hip_shoulder_sep_deg=0.0, hip_shoulder_sep_abs_deg=0.0, confidence=0.0)
# Temporal median to reduce jitter
a_sho_med = float(np.median(sho_angles))
a_hip_med = float(np.median(hip_angles))
conf = float(np.mean(confs)) # 0..1 proportion of frames with decent span
# Separation: hips minus shoulders
sep = _wrap_deg(a_hip_med - a_sho_med)
# DEBUG PRINT: Add debugging info for the angles and separation
print(f"[DBG] roll={roll_deg:.1f}°, a_hip_med={a_hip_med:.1f}°, a_sho_med={a_sho_med:.1f}°, sep={sep:.1f}°")
# FORESHORTENING CHECK: Compare shoulder span from address to impact
span_addr = None
span_ratio = 1.0
geom_ok = True
severe_foreshortening = False
if apply_yaw_correction and setup_idxs:
try:
# Calculate median shoulder and hip spans during address
addr_sho_spans, addr_hip_spans = [], []
for t in setup_idxs:
if t < lm_seq.shape[0]:
lm = lm_seq[t]
if len(lm) > max(L_SHO, R_SHO):
addr_sho_spans.append(_span(lm[L_SHO], lm[R_SHO]))
if len(lm) > max(L_HIP, R_HIP):
addr_hip_spans.append(_span(lm[L_HIP], lm[R_HIP]))
if addr_sho_spans:
span_addr_sho = np.median(addr_sho_spans)
span_addr_hip = np.median(addr_hip_spans) if addr_hip_spans else span_addr_sho
# Current frame spans
lm_current = lm_seq[idx] if idx < lm_seq.shape[0] else None
if lm_current is not None and len(lm_current) > max(L_SHO, R_SHO):
span_imp_sho = _span(lm_current[L_SHO], lm_current[R_SHO])
span_imp_hip = _span(lm_current[L_HIP], lm_current[R_HIP]) if len(lm_current) > max(L_HIP, R_HIP) else span_imp_sho
# Calculate ratios and use the better (less compressed) one
ratio_sho = span_imp_sho / (span_addr_sho + 1e-6)
ratio_hip = span_imp_hip / (span_addr_hip + 1e-6)
span_ratio = max(ratio_sho, ratio_hip) # Use less-compressed ratio
print(f"[DBG] shoulder_span addr={span_addr_sho:.1f}px, impact={span_imp_sho:.1f}px, ratio_sho={ratio_sho:.2f}")
print(f"[DBG] hip_span addr={span_addr_hip:.1f}px, impact={span_imp_hip:.1f}px, ratio_hip={ratio_hip:.2f}")
print(f"[DBG] using_ratio={span_ratio:.2f} (better of sho/hip)")
# Apply clamping to realistic range (0.7-1.0) to avoid underestimation
ratio_clamped = min(1.0, max(0.7, span_ratio))
sep_raw = sep
sep = sep / ratio_clamped
print(f"[DBG] sep_raw={sep_raw:.1f}°, sep_corr={sep:.1f}° (ratio_clamped={ratio_clamped:.2f})")
# Geometry confidence flag - flag low-confidence results when span ratio <0.5
geom_ok = span_ratio >= 0.7 # reasonable geometry
severe_foreshortening = span_ratio < 0.5 # flag low-confidence results
except Exception as e:
print(f"[DBG] Foreshortening calculation failed: {e}")
# Handedness normalization: keep "positive = hips leading" for both
if handed.upper().startswith("L"):
sep *= -1.0
# Adjust confidence based on foreshortening severity
final_conf = conf
if severe_foreshortening:
final_conf *= 0.2 # Very low confidence for severe foreshortening (span ratio <0.5)
elif not geom_ok:
final_conf *= 0.6 # Reduced confidence for geometry issues (span ratio <0.7)
return dict(
hip_shoulder_sep_deg=float(sep),
hip_shoulder_sep_abs_deg=float(abs(sep)),
confidence=float(final_conf),
span_ratio=float(span_ratio),
geometry_ok=bool(geom_ok),
severe_foreshortening=bool(severe_foreshortening)
)
def metric_at_address_top_impact(
lm_seq: np.ndarray,
address_idxs: List[int],
top_idx: Optional[int],
impact_idx: int,
handed: str = "R"
) -> Dict[str, Dict[str, float]]:
"""
Compute the metric at address (median over address_idxs), top (optional), and impact.
address_idxs: e.g., first 8–12 frames while static at setup.
"""
out = {}
# Address: use address window both for setup roll and target idx smoothing
addr_idx = int(np.median(address_idxs))
out["address"] = hip_shoulder_separation_deg(
lm_seq, addr_idx, setup_idxs=address_idxs, handed=handed
)
if top_idx is not None:
out["top"] = hip_shoulder_separation_deg(
lm_seq, top_idx, setup_idxs=address_idxs, handed=handed
)
out["impact"] = hip_shoulder_separation_deg(
lm_seq, impact_idx, setup_idxs=address_idxs, handed=handed
)
return out
def calculate_hip_shoulder_separation_at_impact(
pose_data: Dict[int, List],
swing_phases: Dict[str, List],
handedness: str = 'right'
) -> Dict[str, float]:
"""
Calculate hip-shoulder separation at impact using the new metric.
Args:
pose_data: Dictionary mapping frame indices to pose keypoints
swing_phases: Dictionary mapping phase names to lists of frame indices
handedness: 'right' or 'left' handed player
Returns:
Dict with hip_shoulder_sep_deg, hip_shoulder_sep_abs_deg, confidence
"""
# Convert pose_data to numpy array format expected by the metric
if not pose_data:
return {"hip_shoulder_sep_deg": None, "hip_shoulder_sep_abs_deg": None, "confidence": 0.0}
# Get frame indices
frame_indices = sorted(pose_data.keys())
max_frame_idx = max(frame_indices)
# Create numpy array with shape [T, N, 2] where N >= 33 (Mediapipe format)
lm_seq = np.zeros((max_frame_idx + 1, 33, 2), dtype=float)
# Fill the array with pose data
for frame_idx, landmarks in pose_data.items():
if landmarks and len(landmarks) >= 25: # Need at least hip landmarks
for i, lm in enumerate(landmarks):
if i < 33 and lm and len(lm) >= 2:
lm_seq[frame_idx, i, 0] = lm[0] # x
lm_seq[frame_idx, i, 1] = lm[1] # y
# Get swing phase indices
setup_frames = swing_phases.get("setup", [])
impact_frames = swing_phases.get("impact", [])
if not setup_frames or not impact_frames:
dbg("[HIP-SHO] Missing setup or impact frames")
return {"hip_shoulder_sep_deg": None, "hip_shoulder_sep_abs_deg": None, "confidence": 0.0}
# Use first 8-10 setup frames for address
address_idxs = setup_frames[:min(10, len(setup_frames))]
impact_idx = impact_frames[0]
# Convert handedness format
handed = "R" if handedness.lower().startswith('r') else "L"
try:
# Calculate the metric at impact (5-frame median for temporal smoothing)
result = hip_shoulder_separation_deg(
lm_seq=lm_seq,
idx=impact_idx,
setup_idxs=address_idxs,
handed=handed,
smooth_half_window=2 # 5-frame median around impact (±2 frames)
)
# Enhanced debug output with geometry information
geom_warning = ""
if result.get('severe_foreshortening', False):
geom_warning = " (severely underest. - severe foreshortening)"
elif not result.get('geometry_ok', True):
geom_warning = " (underest. likely; yaw/foreshortening)"
dbg(f"[HIP-SHO] Impact separation: {result['hip_shoulder_sep_deg']:.1f}°, confidence: {result['confidence']:.2f}, span_ratio: {result.get('span_ratio', 1.0):.2f}{geom_warning}")
return result
except Exception as e:
dbg(f"[HIP-SHO] Calculation failed: {e}")
return {"hip_shoulder_sep_deg": None, "hip_shoulder_sep_abs_deg": None, "confidence": 0.0}
# ================== END HIP-SHOULDER SEPARATION METRIC ==================
def calculate_hip_sway_at_top(pose_data: Dict[int, List], swing_phases: Dict[str, List],
world_landmarks: Optional[Dict[int, List]] = None) -> Union[float, None]:
# Apply additional temporal smoothing (Savitzky-Golay or EWMA) before measurement
sm = smooth_landmarks(pose_data, alpha=0.25) # Stronger smoothing for hip sway
setup_frames = _pick_stable_setup_frames(sm, swing_phases, max_frames=6)
if not setup_frames: return None
backswing_frames = swing_phases.get("backswing", [])
if not backswing_frames: return None
top_frame = _pick_top_by_wrist_height(sm, backswing_frames) or backswing_frames[-1]
H, ok, H_frame = _build_foot_only_homography(sm, setup_frames)
if not ok or H is None:
# simple pixel fallback with world toes scale if present
return _simple_pixel_sway_fallback(sm, setup_frames, top_frame, swing_phases, world_landmarks)
import cv2
def warp2(pts):
A = np.array(pts, dtype=np.float32).reshape(1, -1, 2)
return cv2.perspectiveTransform(A, H)[0]
# forward axis = perp to toe line in ground plane
toe_vecs = []
for f in setup_frames:
lm = sm.get(f)
if not lm or not lm[31] or not lm[32]: continue
g = warp2([[lm[31][0], lm[31][1]], [lm[32][0], lm[32][1]]])
v = g[1] - g[0]
n = np.linalg.norm(v)
if n > 1e-6: toe_vecs.append(v/n)
if not toe_vecs: return None
toe_hat = np.median(np.stack(toe_vecs), axis=0); toe_hat /= (np.linalg.norm(toe_hat)+1e-9)
fwd_hat = np.array([-toe_hat[1], toe_hat[0]])
fwd_hat /= (np.linalg.norm(fwd_hat)+1e-9)
# sign disambiguation with addr→impact pelvis motion (optional but stable)
imp = swing_phases.get("impact", [])
cand = imp[0] if imp else backswing_frames[-1]
def pelvis_ctr_g(f):
lm = sm.get(f);
if not lm or not lm[23] or not lm[24]: return None
g = warp2([[lm[23][0], lm[23][1]], [lm[24][0], lm[24][1]]]);
return (g[0]+g[1])/2
addr_ctrs = [pelvis_ctr_g(f) for f in setup_frames]; addr_ctrs = [c for c in addr_ctrs if c is not None]
imp_ctr = pelvis_ctr_g(cand)
if addr_ctrs and imp_ctr is not None:
addr_ctr = np.median(np.stack(addr_ctrs), axis=0)
if float(np.dot(imp_ctr-addr_ctr, fwd_hat)) < 0:
dbg("[SWAY] Flipped fwd_hat based on addr→impact pelvis motion")
fwd_hat = -fwd_hat
# Establish robust address baseline to stop drift from inflating jitter
setup_ctr = np.median(np.stack(addr_ctrs), axis=0)
# Robust baseline with detrending: use tight window around address
addr_ctrs_array = np.stack(addr_ctrs)
addr_x_positions = addr_ctrs_array[:, 0] # X coordinates only
# Light detrend to remove slow drift
t = np.arange(len(addr_x_positions))
if len(addr_x_positions) >= 2:
m, b = np.polyfit(t, addr_x_positions, 1) # Linear fit
residuals = addr_x_positions - (m * t + b)
# Robust jitter using MAD (Median Absolute Deviation)
mad_residuals = np.median(np.abs(residuals - np.median(residuals)))
setup_jitter_px = 1.4826 * mad_residuals # MAD to std conversion
addr_baseline_px = b # Robust address baseline
else:
setup_jitter_px = 0.0
addr_baseline_px = addr_x_positions[0] if len(addr_x_positions) > 0 else setup_ctr[0]
dbg(f"[SWAY] Robust baseline: jitter_px={setup_jitter_px:.2f} (was std-based), addr_baseline={addr_baseline_px:.1f}")
setup_jitter = setup_jitter_px # Use robust jitter
top_ctr = pelvis_ctr_g(top_frame)
if top_ctr is None: return None
# Apply jitter-aware correction - reduce small movements that may be noise
raw_delta = top_ctr - setup_ctr
if np.linalg.norm(raw_delta) < setup_jitter * 1.5:
dbg(f"[SWAY] Movement ({np.linalg.norm(raw_delta):.2f}) < 1.5x setup jitter ({setup_jitter:.2f}), applying correction")
# Partial correction for movements smaller than jitter threshold
correction_factor = max(0.0, (np.linalg.norm(raw_delta) - setup_jitter) / np.linalg.norm(raw_delta)) if np.linalg.norm(raw_delta) > 0 else 0.0
corrected_delta = raw_delta * correction_factor
top_ctr = setup_ctr + corrected_delta
# Validate vertical drift but DO NOT repick top to minimize it
TOP_DRIFT_GU_WARN = 3.0 # warn if pelvis vertical drift > 3 gu, but DO NOT change top
TOP_DRIFT_GU_REJECT = 6.0 # only reject if absurd
vertical_drift = abs(top_ctr[1] - setup_ctr[1]) # Ground-plane Y difference
if vertical_drift > TOP_DRIFT_GU_REJECT:
dbg(f"[SWAY] vertical drift {vertical_drift:.1f} gu -> unreliable sway")
return None
elif vertical_drift > TOP_DRIFT_GU_WARN:
dbg(f"[SWAY] vertical drift {vertical_drift:.1f} gu (warn)")
# DO NOT repick top based on vertical drift - top must be picked by club kinematics
# Ground-plane calibration: gu_per_inch stabilization with consistency check
gu_per_inch_list = []
use_pixel_fallback = False
for k in setup_frames:
if world_landmarks and k in world_landmarks:
try:
# 1) World toe distance (meters -> inches)
Lt_w, Rt_w = world_landmarks[k][31], world_landmarks[k][32]
toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3]))
toe_world_in = float(toe_world_m * 39.3701)
# 2) Ground-plane toe distance (same frame, via homography)
lmK = sm.get(k)
if lmK and lmK[31] and lmK[32]:
toe_g = warp2([
[lmK[31][0], lmK[31][1]],
[lmK[32][0], lmK[32][1]],
]) # shape (2,2)
toe_g_dist = float(np.linalg.norm(toe_g[0] - toe_g[1])) # ground units
if toe_g_dist > 1e-3 and toe_world_in > 1e-3:
gu_per_in = toe_g_dist / max(toe_world_in, 1e-6) # ground-units per inch
# Homography plausibility check: stance width should be ~8–30 inches
if 8.0 <= toe_world_in <= 30.0 and 0.1 <= gu_per_in <= 10.0:
gu_per_inch_list.append(gu_per_in)
dbg(f"[SWAY] frame {k}: toe_g={toe_g_dist:.2f} gu, toe_world={toe_world_in:.2f} in, gu_per_in={gu_per_in:.3f}")
else:
dbg(f"[SWAY] implausible frame {k}: toe_world_in={toe_world_in:.2f}, gu_per_in={gu_per_in:.3f}")
except Exception as e:
dbg(f"[SWAY] calibration failed for frame {k}: {e}")
# Reject frames with inconsistent calibration (>10% deviation from median)
if len(gu_per_inch_list) > 1:
initial_median = np.median(gu_per_inch_list)
consistent_values = []
for val in gu_per_inch_list:
deviation_pct = abs(val - initial_median) / initial_median * 100
if deviation_pct <= 10.0: # Within 10% of median
consistent_values.append(val)
else:
dbg(f"[SWAY] Rejecting inconsistent gu_per_in={val:.3f} (deviation={deviation_pct:.1f}%)")
if consistent_values:
gu_per_inch_list = consistent_values
dbg(f"[SWAY] Kept {len(consistent_values)}/{len(gu_per_inch_list)} consistent calibration values")
if gu_per_inch_list:
gu_per_in = np.median(gu_per_inch_list)
gu_per_in = float(np.clip(gu_per_in, 0.1, 10.0)) # ground-plane appropriate range
use_pixel_fallback = False
dbg(f"[SWAY] using ground calibration: median gu_per_in={gu_per_in:.3f} from {len(gu_per_inch_list)} frames")
else:
dbg("[SWAY] no consistent ground calibration; will try pixel fallback")
use_pixel_fallback = True
gu_per_in = 1.0
# Option B pixel fallback with consistency check
if use_pixel_fallback and world_landmarks and setup_frames:
px_per_inch_list = []
for k in setup_frames:
if k in world_landmarks:
try:
Lt_w, Rt_w = world_landmarks[k][31], world_landmarks[k][32]
toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3]))
toe_world_in = float(toe_world_m * 39.3701)
lmK = sm.get(k)
if lmK and lmK[31] and lmK[32]:
# Image pixel distance
toe_px = float(np.linalg.norm(np.array(lmK[31][:2]) - np.array(lmK[32][:2])))
if toe_px > 1e-3 and toe_world_in > 1e-3:
px_per_in = toe_px / max(toe_world_in, 1e-6)
if 3.0 <= px_per_in <= 60.0: # image pixels appropriate range
px_per_inch_list.append(px_per_in)
dbg(f"[SWAY] FALLBACK frame {k}: toe_px={toe_px:.1f}, toe_world_in={toe_world_in:.2f}, px_per_in={px_per_in:.2f}")
except:
pass
# Apply same consistency check to pixel fallback
if len(px_per_inch_list) > 1:
initial_median = np.median(px_per_inch_list)
consistent_px_values = []
for val in px_per_inch_list:
deviation_pct = abs(val - initial_median) / initial_median * 100
if deviation_pct <= 10.0: # Within 10% of median
consistent_px_values.append(val)
else:
dbg(f"[SWAY] Rejecting inconsistent px_per_in={val:.2f} (deviation={deviation_pct:.1f}%)")
px_per_inch_list = consistent_px_values
if px_per_inch_list:
gu_per_in = np.median(px_per_inch_list) # reuse variable name for simplicity
dbg(f"[SWAY] using pixel fallback: median px_per_in={gu_per_in:.2f} from {len(px_per_inch_list)} frames")
else:
gu_per_in = 1.0
dbg("[SWAY] all calibration failed, using unit scale")
inch_scale = 1.0 / gu_per_in if gu_per_in > 0 else 1.0
# Measure ∆x in ground units end-to-end for consistency
# Convert pelvis positions to ground units
setup_ctr_gu = np.array([addr_baseline_px, setup_ctr[1]]) # Use robust baseline
top_ctr_gu = top_ctr
# Two-stage denoising: apply rolling median + EWMA smoothing to top position
# (Note: setup already detrended above)
if len(addr_ctrs) >= 3:
# Simple moving median for top position stability (if multiple candidates available)
top_candidates = []
for f_nearby in range(max(setup_frames[0], top_frame-2), min(setup_frames[-1], top_frame+3)):
if f_nearby in range(len(pose_data)) and f_nearby != top_frame:
nearby_ctr = pelvis_ctr_g(f_nearby)
if nearby_ctr is not None:
top_candidates.append(nearby_ctr)
if top_candidates and len(top_candidates) >= 2:
top_positions = np.stack([top_ctr] + top_candidates)
# Median filter followed by light smoothing
top_x_values = top_positions[:, 0]
top_x_median = np.median(top_x_values)
# EWMA with conservative alpha on the median-filtered result
alpha = 0.2
top_ctr_gu = np.array([top_x_median, top_ctr[1]])
dbg(f"[SWAY] Two-stage denoising: raw_top_x={top_ctr[0]:.2f}, median_filtered={top_x_median:.2f}")
# Calculate sway in ground units, then convert to inches
if top_ctr_gu is not None and setup_ctr_gu is not None and fwd_hat is not None:
sway_gu = float(np.dot(top_ctr_gu - setup_ctr_gu, fwd_hat))
hip_sway_in = sway_gu * inch_scale
else:
dbg("[SWAY] Missing required data for sway calculation")
return None
# Convert jitter to inches using same scale
setup_jitter_in = setup_jitter * inch_scale if setup_jitter is not None else 0.0
# Lightweight yaw/foreshortening correction using hip span ratio
ratio_hip = 1.0 # Default if no correction available
if world_landmarks and setup_frames:
try:
# Calculate hip span ratio from setup to top for yaw correction
setup_hip_spans = []
for f in setup_frames[:3]: # Use first few setup frames
if f in world_landmarks:
wf = world_landmarks[f]
if len(wf) > 24 and wf[23] and wf[24]: # L_HIP, R_HIP
span = np.linalg.norm(np.array(wf[24][:3]) - np.array(wf[23][:3]))
if 0.15 <= span <= 0.6: # Reasonable hip span in meters
setup_hip_spans.append(span)
if setup_hip_spans and top_frame in world_landmarks:
wf_top = world_landmarks[top_frame]
if len(wf_top) > 24 and wf_top[23] and wf_top[24]:
top_hip_span = np.linalg.norm(np.array(wf_top[24][:3]) - np.array(wf_top[23][:3]))
setup_hip_span = np.median(setup_hip_spans)
ratio_hip = top_hip_span / setup_hip_span
ratio_hip = np.clip(ratio_hip, 0.80, 1.00) # Conservative clamp
dbg(f"[SWAY] Hip span ratio: {ratio_hip:.3f} (setup={setup_hip_span:.3f}m, top={top_hip_span:.3f}m)")
except Exception as e:
dbg(f"[SWAY] Hip span ratio calculation failed: {e}")
# Apply yaw correction
hip_sway_in_corrected = hip_sway_in / ratio_hip
dbg(f"[SWAY] Yaw correction: raw={hip_sway_in:.2f}\", corrected={hip_sway_in_corrected:.2f}\" (ratio={ratio_hip:.3f})")
# Use corrected value
hip_sway_in = hip_sway_in_corrected
# STANCE WIDTH NORMALIZATION: Calculate stance width for normalized sway
stance_width_in = None
if gu_per_inch_list and world_landmarks:
# Use the same calibration frames to get stance width
stance_widths = []
for k in setup_frames:
if k in world_landmarks:
try:
Lt_w, Rt_w = world_landmarks[k][31], world_landmarks[k][32]
toe_world_m = np.linalg.norm(np.array(Lt_w[:3]) - np.array(Rt_w[:3]))
stance_widths.append(float(toe_world_m * 39.3701)) # Convert to inches
except:
pass
if stance_widths:
stance_width_in = np.median(stance_widths)
# Normalize and log (don't change return type)
normalized_sway = None
if stance_width_in and 14 <= stance_width_in <= 28: # Reasonable stance width range
normalized_sway = hip_sway_in / stance_width_in
# Calculate trust ratio with robust jitter
sway_jitter_ratio = abs(hip_sway_in) / max(setup_jitter_in, 1e-6)
trust_flag = "trusted" if sway_jitter_ratio >= 1.5 else "low-trust"
# Enhanced debug output with all the key metrics
norm_str = f"{normalized_sway:.3f}" if normalized_sway is not None else "NA"
dbg(f"[SWAY] dx={hip_sway_in:.2f}in, norm={norm_str}, jitter={setup_jitter_in:.2f}in, ratio={sway_jitter_ratio:.1f}")
# Safe formatting for sway_gu which might be None in some edge cases
sway_gu_str = f"{sway_gu:.2f}" if sway_gu is not None else "None"
gu_per_in_str = f"{gu_per_in:.3f}" if gu_per_in is not None else "None"
dbg(f"[SWAY] addr→top pelvis Δx={sway_gu_str}gu @ gu_per_in={gu_per_in_str}, trust={trust_flag}")
if stance_width_in:
dbg(f"[SWAY] stance_width={stance_width_in:.1f}\", target_normalized≈0.16 for pros")
# Trust but verify debug printout
hip_sway_str = f"{hip_sway_in:.1f}" if hip_sway_in is not None else "None"
gu_per_in_final_str = f"{gu_per_in:.3f}" if gu_per_in is not None else "None"
dbg(f"[SWAY] final hip_sway_in={hip_sway_str}\" with gu_per_in={gu_per_in_final_str}")
# Return signed, un-clipped value to preserve direction; clip only for UI bands if desired
return float(hip_sway_in)
def _shaft_axis_hat(lm, handedness='right'):
"""Shaft axis proxy: line through wrists (trail->lead). Deterministic and stable."""
if not lm: return None
L_WR, R_WR = 15, 16
if not lm[L_WR] or not lm[R_WR] or lm[L_WR][2] < 0.3 or lm[R_WR][2] < 0.3:
return None
# Build trail→lead vector based on handedness
if handedness == 'right':
# Right-handed: trail=16 (right wrist), lead=15 (left wrist)
trail_wrist, lead_wrist = R_WR, L_WR
else:
# Left-handed: trail=15 (left wrist), lead=16 (right wrist)
trail_wrist, lead_wrist = L_WR, R_WR
v = np.array([lm[lead_wrist][0] - lm[trail_wrist][0], lm[lead_wrist][1] - lm[trail_wrist][1]], float) # trail->lead
n = np.linalg.norm(v)
return v / n if n > 1e-6 else None
def _hinge2d_shaft(lm, handed='right', roll_deg=0.0):
"""Lead-forearm only wrist hinge = angle between lead forearm and shaft vectors.
Softened gating and consistent lead-only measurement."""
if not lm: return None
# Lead forearm only - consistent landmark selection
EL, WR = (13, 15) if handed=='right' else (14, 16) # lead elbow/wrist
if not lm[EL] or not lm[WR] or lm[EL][2] < 0.4 or lm[WR][2] < 0.4: # Relaxed visibility threshold
return None
# Lead forearm vector from elbow to wrist
forearm_vec = np.array([lm[WR][0] - lm[EL][0], lm[WR][1] - lm[EL][1]], float)
# Shaft vector using both wrists for more stable direction
shaft_vec = _shaft_axis_hat(lm, handed)
if shaft_vec is None:
# Fallback: use wrist-to-wrist vector as shaft proxy
if lm[15] and lm[16] and lm[15][2] >= 0.4 and lm[16][2] >= 0.4:
# Build trail→lead vector based on handedness for consistency
if handed == 'right':
trail_wrist, lead_wrist = 16, 15
else:
trail_wrist, lead_wrist = 15, 16
shaft_vec = np.array([lm[lead_wrist][0] - lm[trail_wrist][0], lm[lead_wrist][1] - lm[trail_wrist][1]], float)
else:
return None
# De-roll both vectors using the provided roll correction
cr, sr = math.cos(math.radians(-roll_deg)), math.sin(math.radians(-roll_deg))
def deroll(v): return np.array([cr*v[0] - sr*v[1], sr*v[0] + cr*v[1]], float)
forearm_vec = deroll(forearm_vec)
shaft_vec = deroll(shaft_vec)
# Normalize vectors
forearm_norm = np.linalg.norm(forearm_vec)
shaft_norm = np.linalg.norm(shaft_vec)
if forearm_norm < 1e-6 or shaft_norm < 1e-6:
return None
# Calculate angle between lead forearm and shaft
cos_angle = float(np.clip(np.dot(forearm_vec, shaft_vec) / (forearm_norm * shaft_norm), -1.0, 1.0))
angle_between = math.degrees(math.acos(abs(cos_angle))) # Always positive angle
# Report acute angle (angle between forearm and shaft) instead of obtuse
# This puts results in the expected pro range (~50-70° acute)
hinge_acute = angle_between
hinge_obtuse = 180.0 - angle_between # Traditional radial deviation for debugging
# Debug logging: keep both for debugging but standardize to acute
dbg(f"[HINGE] acute={hinge_acute:.1f}°, obtuse={hinge_obtuse:.1f}° (reporting acute)")
return float(hinge_acute)
def _pelvis_ctr_ground_series(sm, frames, warp2):
"""Get pelvis centers in ground plane for given frames"""
ctrs = []
for f in frames:
lm = sm.get(f)
if not lm or not lm[23] or not lm[24]:
ctrs.append(None)
continue
g = warp2([[lm[23][0], lm[23][1]], [lm[24][0], lm[24][1]]])
ctrs.append(((g[0]+g[1])/2).astype(float))
return ctrs
def _refine_top_by_lat_vel(sm, backswing_frames, warp2, target_hat_g, init_top):
"""Refine top frame using lateral pelvis velocity zero-crossing"""
# compute pelvis centers along backswing and project onto target axis
ctrs = _pelvis_ctr_ground_series(sm, backswing_frames, warp2)
ts = [i for i,c in enumerate(ctrs) if c is not None]
if len(ts) < 5:
return init_top # not enough samples
proj = np.array([float(np.dot(ctrs[i], target_hat_g)) if ctrs[i] is not None else np.nan for i in range(len(ctrs))])
# smooth (5-pt)
k = 5
proj_s = np.convolve(np.nan_to_num(proj), np.ones(k)/k, mode='same')
# finite diff
vel = np.gradient(proj_s)
# take frame near init_top where vel≈0 and |vel| is locally minimal (change of direction)
idx0 = backswing_frames.index(init_top) if init_top in backswing_frames else len(backswing_frames)-1
win = range(max(0, idx0-4), min(len(vel), idx0+5))
# find zero-cross or min |vel|
cand = min(win, key=lambda i: abs(vel[i]))
return backswing_frames[cand]
def _point_ground(frame_idx, sm, warp2, i):
"""Get ground-plane position of landmark i in frame frame_idx"""
lm = sm.get(frame_idx)
if not lm or not lm[i]: return None
return warp2([[lm[i][0], lm[i][1]]])[0].astype(float)
def _pick_p3_lead_arm_parallel(sm, backswing_frames, handed='right', tol_deg=15.0):
"""Pick the backswing frame where the lead shoulder→lead wrist vector is most horizontal."""
L_SHO, WR = (11, 15) if handed=='right' else (12, 16)
best = None
best_abs = 1e9
for f in backswing_frames:
lm = sm.get(f)
if not lm or not lm[L_SHO] or not lm[WR]:
continue
if lm[L_SHO][2] < 0.5 or lm[WR][2] < 0.5:
continue
dx = lm[WR][0] - lm[L_SHO][0]
dy = lm[WR][1] - lm[L_SHO][1]
if abs(dx) + abs(dy) < 1e-6:
continue
ang = abs(math.degrees(math.atan2(dy, dx))) # 0° is perfectly horizontal
if ang < best_abs:
best_abs, best = ang, f
return best
def _pick_top_by_wrist_height(sm, backswing, handed='right', min_vis=0.7):
"""Pick top frame by highest wrist position with improved validation"""
WR = 15 if handed=='right' else 16
SH = 11 if handed=='right' else 12 # lead shoulder
best, best_y = None, 1e9
candidates = []
consecutive_rejects = 0
max_consecutive_rejects = 10 # Stop after N consecutive rejects to reduce spam
for f in backswing:
lm = sm.get(f)
if not lm or not lm[WR] or not lm[SH] or lm[WR][2] < min_vis or lm[SH][2] < min_vis:
consecutive_rejects += 1
if consecutive_rejects >= max_consecutive_rejects:
dbg(f"[WRIST] Skipping {max_consecutive_rejects}+ consecutive bad frames, jumping ahead")
break
continue
# Reject frames with shoulder-to-wrist line near vertical (club laid off/occluded)
# Calculate angle of shoulder-to-wrist line from horizontal
dx = lm[WR][0] - lm[SH][0]
dy = lm[WR][1] - lm[SH][1]
if abs(dx) < 1e-6: # Perfectly vertical
consecutive_rejects += 1
if consecutive_rejects >= max_consecutive_rejects:
dbg(f"[WRIST] Skipping {max_consecutive_rejects}+ consecutive steep frames, jumping ahead")
break
continue
angle_from_horizontal = abs(math.degrees(math.atan2(dy, dx)))
# Unified gating constant - use everywhere that gates by shoulder-wrist angle
SHO_WRIST_STEEP_MAX = 120.0 # was ~105 in some functions
if angle_from_horizontal > SHO_WRIST_STEEP_MAX:
consecutive_rejects += 1
if consecutive_rejects >= max_consecutive_rejects:
dbg(f"[WRIST] Skipping {max_consecutive_rejects}+ consecutive steep frames, jumping ahead")
break
# Only log first few rejects to reduce spam
if consecutive_rejects <= 3:
dbg(f"[WRIST] Rejecting frame {f}: shoulder-wrist angle {angle_from_horizontal:.1f}° too steep")
continue
# Found a good frame, reset consecutive reject counter
consecutive_rejects = 0
y = lm[WR][1] # smaller y = higher hand (screen coords)
candidates.append((f, y, angle_from_horizontal))
if y < best_y:
best_y, best = y, f
# Additional validation: ensure chosen frame is not an outlier
if best is not None and candidates:
# Check if there are other reasonable candidates nearby in height
height_tolerance = 20 # pixels
reasonable_candidates = [c for c in candidates if abs(c[1] - best_y) <= height_tolerance]
if len(reasonable_candidates) > 1:
# Among height-similar candidates, prefer one with better shoulder-wrist geometry
reasonable_candidates.sort(key=lambda x: x[2]) # Sort by angle (smaller is better)
best_candidate = reasonable_candidates[0]
best = best_candidate[0]
dbg(f"[WRIST] Selected frame {best} with angle {best_candidate[2]:.1f}° from {len(reasonable_candidates)} candidates")
return best
def _pick_top_frame_hinge(sm, backswing_frames, handed='right', roll_deg=0.0):
"""Pick frame with maximum hinge in backswing, with visibility and geometry validation"""
if not backswing_frames:
return None
SH = 11 if handed=='right' else 12 # lead shoulder
WR = 15 if handed=='right' else 16 # lead wrist
# Restrict to P3→P4 window by limiting to last 70% of backswing
p3_start_idx = max(0, int(0.3 * len(backswing_frames)))
p3_p4_frames = backswing_frames[p3_start_idx:]
top_vals = []
for f in p3_p4_frames:
lm = sm.get(f)
if not lm or not lm[WR] or not lm[SH] or lm[WR][2] < 0.7 or lm[SH][2] < 0.7:
continue
# Reject frames with shoulder-to-wrist line near vertical (same logic as height-based)
dx = lm[WR][0] - lm[SH][0]
dy = lm[WR][1] - lm[SH][1]
if abs(dx) < 1e-6: # Perfectly vertical
continue
angle_from_horizontal = abs(math.degrees(math.atan2(dy, dx)))
SHO_WRIST_STEEP_MAX = 120.0 # Unified constant
if angle_from_horizontal > SHO_WRIST_STEEP_MAX: # Use unified threshold
continue
h = _hinge2d_shaft(lm, handed, roll_deg)
if h is not None:
# Additional sanity check for hinge values (h is now acute angle)
if 10.0 <= h <= 120.0: # plausible range for acute angles
top_vals.append((f, h))
if not top_vals:
return None
# Return frame with maximum hinge
top_frame = max(top_vals, key=lambda x: x[1])[0]
return top_frame
def calculate_wrist_hinge_at_top(pose_data: Dict[int, List], swing_phases: Dict[str, List],
handedness: str = 'right') -> Union[Dict[str, float], None]:
"""Calculate wrist hinge using shaft-based method with both P3 and top measurements"""
# Define indices locally to avoid hidden globals
L_EYE, R_EYE = 2, 5
L_HIP, R_HIP = 23, 24
sm = smooth_landmarks(pose_data)
backswing_frames = swing_phases.get("backswing", [])
if not backswing_frames:
# Fallback: use any available frames as "backswing"
all_frames = sorted(pose_data.keys())
if all_frames:
# Use middle third of available frames as "backswing"
mid_start = len(all_frames) // 3
mid_end = 2 * len(all_frames) // 3
backswing_frames = all_frames[mid_start:mid_end] or all_frames
else:
return None # Truly no data
# Reuse consensus roll from shoulder tilt function
def consensus_roll(lm, H=None, W=None):
cands = []
for iL, iR in [(L_EYE, R_EYE), (L_HIP, R_HIP)]:
if lm[iL] and lm[iR]:
dx = lm[iR][0] - lm[iL][0]
dy = lm[iR][1] - lm[iL][1]
if abs(dx)+abs(dy) > 1e-6:
t = math.degrees(math.atan2(dy, dx))
t = ((t + 90.0) % 180.0) - 90.0
if abs(t) <= 20: cands.append(t)
return float(np.median(cands)) if cands else 0.0
# Initial roll from setup frames (clipped conservatively)
setup_frames = swing_phases.get("setup", [])[:8]
setup_rolls = [consensus_roll(sm.get(f)) for f in setup_frames if sm.get(f)]
setup_roll_deg = float(np.clip(np.median(setup_rolls) if setup_rolls else 0.0, -10.0, 10.0))
# More robust top frame selection: try multiple methods
top_frame = _pick_top_by_wrist_height(sm, backswing_frames, handed=handedness)
if not top_frame:
top_frame = _pick_top_frame_hinge(sm, backswing_frames, handed=handedness, roll_deg=setup_roll_deg)
if not top_frame and backswing_frames:
# Last resort: use frame with max backswing index that has good landmarks
for f in reversed(backswing_frames):
lm = sm.get(f)
EL, WR = (13, 15) if handedness=='right' else (14, 16)
if lm and lm[EL] and lm[WR] and lm[EL][2] >= 0.5 and lm[WR][2] >= 0.5:
top_frame = f
dbg(f"[WRIST] Using fallback top frame: {f}")
break
# Ensure we never have None for top_frame
if top_frame is None and backswing_frames:
top_frame = backswing_frames[-1]
# Compute local roll around top frame for better accuracy
roll_deg = setup_roll_deg # Start with setup roll as baseline
if top_frame is not None:
# Get frames around top for local roll calculation
local_window = [f for f in range(max(backswing_frames[0], top_frame-4),
min(backswing_frames[-1], top_frame+5)+1) if f in backswing_frames]
local_rolls = [consensus_roll(sm.get(f)) for f in local_window if sm.get(f)]
if local_rolls:
# Use wider clipping range for top position (±20-25°)
local_roll = float(np.median(local_rolls))
roll_deg = float(np.clip(local_roll, -25.0, 25.0))
dbg(f"[WRIST] Local roll at top: {roll_deg:.1f}° (setup: {setup_roll_deg:.1f}°)")
else:
dbg(f"[WRIST] Using setup roll: {roll_deg:.1f}° (no local roll data)")
# FIXED: Calculate hinge with moving window average around top frame for smoothness
hinge_top = None
if top_frame is not None:
# Use ±2 frames around top for moving window average (5-frame window)
win = [f for f in range(max(backswing_frames[0], top_frame-2),
min(backswing_frames[-1], top_frame+2)+1) if f in backswing_frames]
valid_hinges = []
for f in win:
landmarks = sm.get(f)
if landmarks:
h = _hinge2d_shaft(landmarks, handedness, roll_deg)
if h is not None:
# h here is now acute angle (angle between forearm and shaft)
if 10.0 <= h <= 120.0: # plausible range for acute angles
valid_hinges.append((f, h))
dbg(f"[WRIST] frame {f}: hinge_acute={h:.1f}°")
if valid_hinges:
# Use moving window average of valid hinge values for smoothness
hinge_values = [h for _, h in valid_hinges]
if len(hinge_values) >= 3:
# Remove outliers and take median for robustness
hinge_array = np.array(hinge_values)
median_val = np.median(hinge_array)
mad = np.median(np.abs(hinge_array - median_val))
outlier_threshold = median_val + 2.5 * mad # Remove values more than 2.5 MAD from median
filtered_hinges = [h for h in hinge_values if abs(h - median_val) <= 2.5 * mad]
if filtered_hinges:
hinge_top = float(np.mean(filtered_hinges)) # Use mean of filtered values
dbg(f"[WRIST] Smoothed hinge (mean of {len(filtered_hinges)} values): {hinge_top:.1f}°")
else:
hinge_top = float(median_val)
else:
# Not enough data for robust averaging, use median
hinge_top = float(np.median(hinge_values))
dbg(f"[WRIST] Limited data, using median of {len(hinge_values)} values: {hinge_top:.1f}°")
else:
# Fallback: find best single frame
best = (top_frame, None)
for f in win:
landmarks = sm.get(f)
if landmarks:
h = _hinge2d_shaft(landmarks, handedness, roll_deg)
if h is not None and (best[1] is None or h > best[1]):
best = (f, h)
top_frame, hinge_top = best
# Additional validation to catch landmark selection errors (now acute angles)
if hinge_top is not None and (hinge_top < 5.0 or hinge_top > 120.0):
dbg(f"WARNING: Suspicious acute hinge angle {hinge_top:.1f}° - possible landmark error")
hinge_top = np.clip(hinge_top, 20.0, 90.0) # Conservative clamp for acute angles
dbg(f"top_selection: selected={top_frame}")
# FIXED: P3 frame and hinge with better validation
p3_frame = _pick_p3_lead_arm_parallel(sm, backswing_frames, handedness)
hinge_p3 = None
if p3_frame is not None:
p3_landmarks = sm.get(p3_frame)
if p3_landmarks:
hinge_p3 = _hinge2d_shaft(p3_landmarks, handedness, roll_deg)
# Validate P3 hinge angle is reasonable (now acute angle)
if hinge_p3 is not None and (hinge_p3 < 10.0 or hinge_p3 > 120.0):
dbg(f"WARNING: Suspicious P3 acute hinge angle {hinge_p3:.1f}° - possible landmark error")
# FIXED: Address baseline with improved landmark validation
addr_frames = _pick_stable_setup_frames(sm, swing_phases, max_frames=6) or _address_frames(swing_phases)
addr_vals = []
for f in addr_frames[:6]:
addr_landmarks = sm.get(f)
if addr_landmarks:
h = _hinge2d_shaft(addr_landmarks, handedness, roll_deg)
# Remove address hinge range filter - accept all valid measurements
if h is not None:
addr_vals.append(h)
hinge_addr = float(np.median(addr_vals)) if addr_vals else None
# If top hinge failed, borrow P3; if that also failed, borrow address
if hinge_top is None:
if hinge_p3 is not None:
hinge_top = hinge_p3
dbg(f"hinge_top was None, borrowed from P3: {hinge_top}")
elif hinge_addr is not None:
hinge_top = hinge_addr # last resort (won't be graded as "top", but avoids None)
dbg(f"hinge_top was None, borrowed from address: {hinge_top}")
# Remove auto-promotion to avoid convergence bias
# Keep original values to show real differences
# hinge_top and hinge_p3 are now acute angles (forearm-shaft angle)
theta_top = hinge_top # No conversion needed, already acute
theta_p3 = hinge_p3 # No conversion needed, already acute
# Debug prints to verify shaft angles
if theta_top is not None:
dbg(f"[WRIST-CHECK] theta_top = {theta_top:.1f}° (acute angle between forearm & shaft)")
# Also calculate obtuse angles for backward compatibility and debugging
obtuse_top = None if hinge_top is None else (180.0 - hinge_top)
obtuse_p3 = None if hinge_p3 is None else (180.0 - hinge_p3)
# Enhanced debug output with window information
window_size = len([f for f in range(max(backswing_frames[0], top_frame-2),
min(backswing_frames[-1], top_frame+2)+1) if f in backswing_frames]) if top_frame else 0
hinge_str = f"{hinge_top:.1f}" if hinge_top is not None else "None"
dbg(f"[WRIST] samples={window_size}, mean={hinge_str}°, window=±2")
dbg(f"[WRIST] roll={roll_deg:.1f}°, addr={hinge_addr}, p3={hinge_p3}, top={hinge_top}, "
f"Δp3={None if hinge_addr is None or hinge_p3 is None else hinge_p3-hinge_addr}, "
f"Δtop={None if hinge_addr is None or hinge_top is None else hinge_top-hinge_addr}")
# Trust but verify debug printout
if hinge_top is not None:
dbg(f"[WRIST] final wrist_hinge_top={hinge_top:.1f}° (handed={handedness})")
else:
dbg(f"[WRIST] final wrist_hinge_top=None (handed={handedness})")
result = {
# Report acute angles (what coaches expect: ~50-70° for pros)
"forearm_shaft_angle_top_deg": theta_top,
"forearm_shaft_angle_p3_deg": theta_p3,
"addr_deg": hinge_addr,
"delta_deg_p3": (hinge_p3 - hinge_addr) if (hinge_p3 is not None and hinge_addr is not None) else None,
"delta_deg_top": (hinge_top - hinge_addr) if (hinge_top is not None and hinge_addr is not None) else None,
# NEW: Standardize to acute angles for main reporting
"acute_angle_deg_top": hinge_top,
"acute_angle_deg_p3": hinge_p3,
# Keep obtuse angles for debugging and backward compatibility
"obtuse_angle_deg_top": obtuse_top,
"obtuse_angle_deg_p3": obtuse_p3,
# Flag outside typical pro band for P3 (now using acute angle range)
"definition_suspect": (hinge_p3 is None) or not (40 <= hinge_p3 <= 80),
# Legacy field for backward compatibility - use acute angle
"radial_deviation_deg": hinge_p3 if hinge_p3 is not None else hinge_top
}
return result
def compute_front_facing_metrics(pose_data: Dict[int, List], swing_phases: Dict[str, List],
world_landmarks: Optional[Dict[int, List]] = None,
frames: Optional[List[np.ndarray]] = None,
club: str = "iron",
handedness: str = "right") -> Dict[str, Dict[str, Union[float, str, None]]]:
"""Compute the 4 front-facing golf swing metrics with club-aware grading"""
# Calculate individual metrics
torso_sidebend_result = calculate_torso_sidebend_at_impact(
pose_data, swing_phases, frames=frames, handedness=handedness, world_landmarks=world_landmarks
)
# Extract value for compatibility
shoulder_tilt_impact = torso_sidebend_result.get("value") if torso_sidebend_result else None
hip_sway_top = calculate_hip_sway_at_top(pose_data, swing_phases, world_landmarks)
wrist_hinge_result = calculate_wrist_hinge_at_top(pose_data, swing_phases, handedness=handedness)
hip_shoulder_sep_result = calculate_hip_shoulder_separation_at_impact(pose_data, swing_phases, handedness)
# Extract wrist hinge values (NEW: prioritize forearm-shaft angle measurements)
wrist_top_theta = wrist_hinge_result.get('forearm_shaft_angle_top_deg') if wrist_hinge_result else None
wrist_p3_theta = wrist_hinge_result.get('forearm_shaft_angle_p3_deg') if wrist_hinge_result else None
wrist_p3 = wrist_hinge_result.get('radial_deviation_deg_p3') if wrist_hinge_result else None
wrist_top = wrist_hinge_result.get('radial_deviation_deg_top') if wrist_hinge_result else None
wrist_addr = wrist_hinge_result.get('addr_deg') if wrist_hinge_result else None
delta_p3 = wrist_hinge_result.get('delta_deg_p3') if wrist_hinge_result else None
delta_top = wrist_hinge_result.get('delta_deg_top') if wrist_hinge_result else None
definition_suspect = wrist_hinge_result.get('definition_suspect', False) if wrist_hinge_result else False
# Initialize to avoid undefined locals
wrist_final = None
wrist_kind = None
# Prefer radial deviation (hinge), then P3 radial, then θ as fallback
if wrist_top is not None:
wrist_final = wrist_top
wrist_kind = "radial_deviation_top"
elif wrist_p3 is not None:
wrist_final = wrist_p3
wrist_kind = "radial_deviation_p3"
elif wrist_top_theta is not None:
wrist_final = wrist_top_theta
wrist_kind = "forearm_shaft_angle_top"
else:
wrist_final = wrist_p3_theta
wrist_kind = "forearm_shaft_angle_p3"
# Add visible version tag and debug info
print(f"[FF] {METRICS_VERSION} running (debug={'on' if VERBOSE else 'off'})")
# Log metrics summary for debugging
dbg(f"[METRIC] torso_side_bend={shoulder_tilt_impact}, sway_top_in={hip_sway_top}, hinge_top={wrist_hinge_result.get('radial_deviation_deg_top') if wrist_hinge_result else None}, hip_sho_sep={hip_shoulder_sep_result.get('hip_shoulder_sep_deg') if hip_shoulder_sep_result else None}")
front_facing_metrics = {
'torso_side_bend_deg': {
'value': shoulder_tilt_impact,
'metric': torso_sidebend_result.get("metric") if torso_sidebend_result else None,
'quality': torso_sidebend_result.get("quality") if torso_sidebend_result else None,
'details': torso_sidebend_result.get("details") if torso_sidebend_result else None
},
'shoulder_tilt_impact_deg': {'value': shoulder_tilt_impact}, # Deprecated, alias for compatibility
'hip_sway_top_inches': {'value': hip_sway_top},
'hip_shoulder_separation_impact_deg': {
'value': hip_shoulder_sep_result.get('hip_shoulder_sep_deg') if hip_shoulder_sep_result else None,
'abs_deg': hip_shoulder_sep_result.get('hip_shoulder_sep_abs_deg') if hip_shoulder_sep_result else None,
'confidence': hip_shoulder_sep_result.get('confidence') if hip_shoulder_sep_result else 0.0
},
'wrist_hinge_top_deg': {
'value': wrist_final,
'value_kind': wrist_kind,
'forearm_shaft_angle_top_deg': wrist_top_theta,
'forearm_shaft_angle_p3_deg': wrist_p3_theta,
'radial_deviation_p3_deg': wrist_p3,
'radial_deviation_top_deg': wrist_top,
'addr_deg': wrist_addr,
'delta_p3_deg': delta_p3,
'delta_top_deg': delta_top,
'definition_suspect': definition_suspect
},
'__version__': METRICS_VERSION,
'_debug': {
'version': METRICS_VERSION,
'used_world': bool(world_landmarks),
'setup_frames': swing_phases.get('setup', [])[:8],
'impact_frames': swing_phases.get('impact', []),
}
}
return front_facing_metrics