from __future__ import annotations import math from pathlib import Path import sys import unittest sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) from pozify.contracts import PoseFrame, PoseSequence, UserProfile, VideoManifest, validate_contract from pozify.exercises import create_exercise_strategy from pozify.exercises.push_up import PushUpExercise from pozify.exercises.shoulder_press import ShoulderPressExercise from pozify.exercises.squat import SquatExercise from pozify.steps.rep_signals import angle_deg def _frame(frame_index: int, landmarks: dict[str, dict[str, float]]) -> PoseFrame: return PoseFrame( frame_index=frame_index, timestamp_sec=round(frame_index / 30.0, 3), landmarks=landmarks, world_landmarks={}, pose_quality={"mean_visibility": 0.95, "normalized": True}, ) def _wave(frame_index: int, cycle_frames: int, *, phase_shift: float = 0.0) -> float: return (1.0 - math.cos(2.0 * math.pi * ((frame_index + phase_shift) / cycle_frames))) / 2.0 def _squat_landmarks(depth: float) -> dict[str, dict[str, float]]: hip_y = 0.52 + depth * 0.18 shoulder_y = 0.28 + depth * 0.06 knee_y = 0.7 ankle_y = 0.92 hip_x_left, hip_x_right = 0.43, 0.57 knee_x_left = hip_x_left + depth * 0.06 knee_x_right = hip_x_right - depth * 0.06 return { "left_shoulder": {"x": 0.4, "y": shoulder_y, "smoothed_x": 0.4, "smoothed_y": shoulder_y}, "right_shoulder": {"x": 0.6, "y": shoulder_y, "smoothed_x": 0.6, "smoothed_y": shoulder_y}, "left_hip": {"x": hip_x_left, "y": hip_y, "smoothed_x": hip_x_left, "smoothed_y": hip_y}, "right_hip": {"x": hip_x_right, "y": hip_y, "smoothed_x": hip_x_right, "smoothed_y": hip_y}, "left_knee": {"x": knee_x_left, "y": knee_y, "smoothed_x": knee_x_left, "smoothed_y": knee_y}, "right_knee": {"x": knee_x_right, "y": knee_y, "smoothed_x": knee_x_right, "smoothed_y": knee_y}, "left_ankle": {"x": 0.42, "y": ankle_y, "smoothed_x": 0.42, "smoothed_y": ankle_y}, "right_ankle": {"x": 0.58, "y": ankle_y, "smoothed_x": 0.58, "smoothed_y": ankle_y}, } def _push_up_landmarks(depth: float) -> dict[str, dict[str, float]]: shoulder_y = 0.38 + depth * 0.16 hip_y = 0.46 + depth * 0.16 ankle_y = 0.54 + depth * 0.16 elbow_y = 0.46 + depth * 0.1 wrist_y = 0.5 + depth * 0.04 elbow_x_left = 0.36 + depth * 0.02 elbow_x_right = 0.64 - depth * 0.02 return { "left_shoulder": {"x": 0.3, "y": shoulder_y, "smoothed_x": 0.3, "smoothed_y": shoulder_y}, "right_shoulder": {"x": 0.7, "y": shoulder_y, "smoothed_x": 0.7, "smoothed_y": shoulder_y}, "left_elbow": {"x": elbow_x_left, "y": elbow_y, "smoothed_x": elbow_x_left, "smoothed_y": elbow_y}, "right_elbow": {"x": elbow_x_right, "y": elbow_y, "smoothed_x": elbow_x_right, "smoothed_y": elbow_y}, "left_wrist": {"x": 0.34, "y": wrist_y, "smoothed_x": 0.34, "smoothed_y": wrist_y}, "right_wrist": {"x": 0.66, "y": wrist_y, "smoothed_x": 0.66, "smoothed_y": wrist_y}, "left_hip": {"x": 0.42, "y": hip_y, "smoothed_x": 0.42, "smoothed_y": hip_y}, "right_hip": {"x": 0.58, "y": hip_y, "smoothed_x": 0.58, "smoothed_y": hip_y}, "left_ankle": {"x": 0.44, "y": ankle_y, "smoothed_x": 0.44, "smoothed_y": ankle_y}, "right_ankle": {"x": 0.56, "y": ankle_y, "smoothed_x": 0.56, "smoothed_y": ankle_y}, } def _shoulder_press_landmarks(lift: float) -> dict[str, dict[str, float]]: wrist_y = 0.76 - lift * 0.34 elbow_y = 0.6 - lift * 0.1 elbow_x_left = 0.4 - lift * 0.05 elbow_x_right = 0.6 + lift * 0.05 return { "left_shoulder": {"x": 0.42, "y": 0.42, "smoothed_x": 0.42, "smoothed_y": 0.42}, "right_shoulder": {"x": 0.58, "y": 0.42, "smoothed_x": 0.58, "smoothed_y": 0.42}, "left_elbow": {"x": elbow_x_left, "y": elbow_y, "smoothed_x": elbow_x_left, "smoothed_y": elbow_y}, "right_elbow": {"x": elbow_x_right, "y": elbow_y, "smoothed_x": elbow_x_right, "smoothed_y": elbow_y}, "left_wrist": {"x": 0.4, "y": wrist_y, "smoothed_x": 0.4, "smoothed_y": wrist_y}, "right_wrist": {"x": 0.6, "y": wrist_y, "smoothed_x": 0.6, "smoothed_y": wrist_y}, } def _sequence_for_exercise(exercise: str, cycles: int, *, partial_tail: bool = False) -> PoseSequence: cycle_frames = 24 total_frames = cycles * cycle_frames + (cycle_frames // 2 if partial_tail else 0) frames: list[PoseFrame] = [] for frame_index in range(total_frames): wave = _wave(frame_index, cycle_frames) if exercise == "squat": landmarks = _squat_landmarks(wave) elif exercise == "shoulder_press": landmarks = _shoulder_press_landmarks(wave) else: landmarks = _push_up_landmarks(wave) frames.append(_frame(frame_index, landmarks)) return PoseSequence( frames=frames, normalized=True, smoothing_method="exponential_smoothing", pose_valid_ratio=1.0, ) def _video_manifest(sequence: PoseSequence) -> VideoManifest: return VideoManifest( video_path=None, fps=30.0, duration_sec=round(len(sequence.frames) / 30.0, 3), total_frames=len(sequence.frames), sampled_frames=len(sequence.frames), width=720, height=1280, codec=None, container=None, brightness_mean=None, blur_laplacian_var=None, quality_warnings=[], analysis_allowed=True, ) def _profile(exercise: str = "auto") -> UserProfile: return UserProfile( goal="beginner_practice", experience_level="beginner", intended_exercise=exercise, intended_variation=None, known_limitations=[], equipment="bodyweight", ) def _exercise_strategy(exercise: str, sequence: PoseSequence): return create_exercise_strategy( exercise, video_manifest=_video_manifest(sequence), pose_sequence=sequence, profile=_profile(exercise), ) class RepCounterTests(unittest.TestCase): def test_angle_deg_uses_3d_world_landmarks(self) -> None: frame = PoseFrame( frame_index=0, timestamp_sec=0.0, landmarks={}, world_landmarks={ "left_shoulder": {"x": 1.0, "y": 0.0, "z": 0.0}, "left_elbow": {"x": 0.0, "y": 0.0, "z": 0.0}, "left_wrist": {"x": 0.0, "y": 0.0, "z": 1.0}, }, pose_quality={"mean_visibility": 1.0}, ) self.assertEqual(round(angle_deg(frame, "left_shoulder", "left_elbow", "left_wrist") or 0.0), 90) def test_segments_squat_reps(self) -> None: sequence = _sequence_for_exercise("squat", 3) reps, debug = _exercise_strategy("squat", sequence).count() self.assertEqual(len(reps.reps), 3) self.assertEqual(reps.reps[0].start_frame, 0) self.assertTrue(debug["accepted_reps"]) def test_segments_push_up_reps(self) -> None: sequence = _sequence_for_exercise("push_up", 3) reps, _debug = _exercise_strategy("push_up", sequence).count() self.assertEqual(len(reps.reps), 3) self.assertEqual(reps.partial_reps, []) def test_segments_shoulder_press_reps(self) -> None: sequence = _sequence_for_exercise("shoulder_press", 3) reps, _debug = _exercise_strategy("shoulder_press", sequence).count() self.assertEqual(len(reps.reps), 3) self.assertEqual(reps.reps[0].mid_frame, 12) def test_reports_partial_last_rep(self) -> None: sequence = _sequence_for_exercise("push_up", 2, partial_tail=True) reps, _debug = _exercise_strategy("push_up", sequence).count() self.assertEqual(len(reps.reps), 2) self.assertIn("ends_mid_rep", {item["reason"] for item in reps.partial_reps}) def test_unknown_exercise_is_not_segmented(self) -> None: sequence = _sequence_for_exercise("push_up", 1) reps, debug = _exercise_strategy("unknown", sequence).count() self.assertEqual(reps.reps, []) self.assertEqual(reps.partial_reps, [{"reason": "unknown_exercise"}]) self.assertEqual(debug["selected_signal"], "none") validate_contract("reps.json", reps) validate_contract("rep_debug.json", debug) def test_exercises_resolve_to_specific_strategies(self) -> None: sequence = _sequence_for_exercise("push_up", 1) push_up = _exercise_strategy("push_up", sequence) self.assertIsInstance(push_up, PushUpExercise) self.assertIsInstance(_exercise_strategy("shoulder_press", sequence), ShoulderPressExercise) self.assertIsInstance(_exercise_strategy("squat", sequence), SquatExercise) self.assertIsNot(push_up, _exercise_strategy("push_up", sequence)) if __name__ == "__main__": unittest.main()