""" Simple swing segmentation with downswing re-gating """ def _dt_and_fps(frame_timestamps_ms, frames: int, total_ms: float): """Calculate time delta and FPS from frame data""" if frame_timestamps_ms and len(frame_timestamps_ms) >= 2: dt = (frame_timestamps_ms[-1] - frame_timestamps_ms[0]) / max(len(frame_timestamps_ms) - 1, 1) / 1000.0 else: dt = (total_ms / 1000.0) / max(frames, 1) return dt, 1.0 / max(dt, 1e-6) def detect_arm_velocity_zero_crossing(pose_data, frames): """Simple fallback for top detection""" if not frames: return 0 return frames[len(frames)//3] # Simple fallback def segment_swing(pose_data, detections=None, sample_rate=1, frame_shape=None, frame_timestamps_ms=None, total_ms=None, fps=30.0): """ Simplified swing segmentation with downswing re-gating logic. Args: pose_data: Dictionary mapping frame indices to pose keypoints detections: Object detections (unused in current implementation) sample_rate: Frame sampling rate frame_shape: Frame shape (unused in current implementation) frame_timestamps_ms: List of frame timestamps in milliseconds total_ms: Total video duration in milliseconds fps: Video frame rate (fallback if timestamps not available) Returns: dict: Dictionary with swing phases and timing_unreliable flag """ frames = [i for i in sorted(pose_data) if pose_data[i] is not None] out = {"setup":[], "backswing":[], "downswing":[], "impact":[], "follow_through":[], "timing_unreliable": False} if not frames: return out # Improved segmentation using biomechanical markers total_frames = len(frames) # Setup phase - first 12.5% (this is fairly reliable) setup_end_idx = max(1, total_frames // 8) setup_end = frames[setup_end_idx] # Detect top of swing using arm velocity zero crossing (more reliable than time-based) backswing_frames_for_analysis = [f for f in frames if f > setup_end] top = detect_arm_velocity_zero_crossing(pose_data, backswing_frames_for_analysis) # Robust impact detection using clubhead velocity zero-crossing # Look for the frame where clubhead velocity changes from downward to upward impact_candidates = [] if pose_data: # Get frames after top for impact analysis frames_after_top = [f for f in frames if f > top and f in pose_data] if len(frames_after_top) >= 5: clubhead_positions = [] valid_frames = [] for frame_idx in frames_after_top: kp = pose_data[frame_idx] if kp and len(kp) > 15: # Use wrist as proxy for clubhead (lead arm wrist for right-handed) wrist = kp[15][:2] # Right wrist if kp[15][2] > 0.5: # Good visibility clubhead_positions.append(wrist[1]) # Y-coordinate (vertical) valid_frames.append(frame_idx) if len(clubhead_positions) >= 5: # Calculate vertical velocity (downward = positive, upward = negative) velocities = [] for i in range(1, len(clubhead_positions)): vel = clubhead_positions[i] - clubhead_positions[i-1] velocities.append(vel) # Find zero crossing: velocity changes from positive to negative for i in range(1, len(velocities)): if velocities[i-1] > 0 and velocities[i] <= 0: # Found impact - clubhead starts moving upward impact_candidates.append(valid_frames[i]) break # Fallback: if no velocity zero-crossing found, use timing-based estimate if not impact_candidates: top_idx_in_total = frames.index(top) if top in frames else total_frames // 3 remaining_frames_after_top = total_frames - top_idx_in_total # Tour pro downswing: ~8-10 frames at 30fps (25-30% of total swing) expected_downswing_frames = max(8, int(total_frames * 0.25)) impact_idx = min(total_frames - 1, top_idx_in_total + expected_downswing_frames) imp = frames[impact_idx] else: imp = impact_candidates[0] # Assign frames to phases for f in frames: if f <= setup_end: out["setup"].append(f) elif f <= top: out["backswing"].append(f) elif f < imp: out["downswing"].append(f) elif f == imp: out["impact"].append(f) else: out["follow_through"].append(f) # Get timing information for downswing re-gating if total_ms is None: total_ms = total_frames * (1000.0 / fps) # fallback estimate dt, actual_fps = _dt_and_fps(frame_timestamps_ms, total_frames, total_ms) # Downswing re-gating logic downswing_frames = out["downswing"] if downswing_frames: downswing_duration_frames = len(downswing_frames) # Scale expected range by fps (~30 fps baseline) fps_scale = actual_fps / 30.0 min_expected = max(1, int(6 * fps_scale)) max_expected = int(15 * fps_scale) # Check if downswing is outside expected range if downswing_duration_frames < min_expected or downswing_duration_frames > max_expected: # Mark timing as unreliable when downswing duration is outside expected range # (Angular velocity re-gating removed for 5 core metrics simplification) out["timing_unreliable"] = True return out