"""Tests for all rubric scorers and Phase 2 agents.""" from formscout.types import ( BiomechFeatures, ScoreResult, MovementResult, JudgeResult, ReportResult, ) from formscout.rubric import score_test, SCORERS from formscout.rubric.hurdle_step import score_hurdle_step from formscout.rubric.inline_lunge import score_inline_lunge from formscout.rubric.shoulder_mobility import score_shoulder_mobility from formscout.rubric.active_slr import score_active_slr from formscout.rubric.trunk_stability_pushup import score_trunk_stability_pushup from formscout.rubric.rotary_stability import score_rotary_stability from formscout.agents.judge import JudgeAgent from formscout.agents.report import ReportAgent def _make_features(test_name, angles=None, alignments=None, side="na", sym_delta=None): return BiomechFeatures( test_name=test_name, view="2d", side=side, angles=angles or {}, alignments=alignments or {}, symmetry_delta=sym_delta, timing={}, confidence=0.8, ) # ─── Rubric dispatch ───────────────────────────────────────────────────────── class TestRubricDispatch: def test_all_tests_have_scorers(self): expected = {"deep_squat", "hurdle_step", "inline_lunge", "shoulder_mobility", "active_slr", "trunk_stability_pushup", "rotary_stability"} assert set(SCORERS.keys()) == expected def test_dispatch_unknown_test(self): f = _make_features("unknown_test") r = score_test(f) assert r.confidence == 0.0 # ─── Hurdle Step ────────────────────────────────────────────────────────────── class TestHurdleStep: def test_score_3_good_form(self): f = _make_features("hurdle_step", angles={ "step_hip_flexion_deg": 100.0, "stance_knee_angle_deg": 175.0, "shoulder_tilt_deg": 5.0, }, alignments={"trunk_stable": True, "stance_knee_extended": True}) r = score_hurdle_step(f) assert r.score == 3 def test_score_2_compensation(self): f = _make_features("hurdle_step", angles={ "step_hip_flexion_deg": 80.0, "stance_knee_angle_deg": 170.0, }, alignments={"trunk_stable": True, "stance_knee_extended": True}) r = score_hurdle_step(f) assert r.score == 2 def test_score_1_poor(self): f = _make_features("hurdle_step", angles={ "step_hip_flexion_deg": 50.0, "stance_knee_angle_deg": 140.0, }, alignments={"trunk_stable": False, "stance_knee_extended": False}) r = score_hurdle_step(f) assert r.score == 1 def test_never_scores_zero(self): f = _make_features("hurdle_step", angles={ "step_hip_flexion_deg": 30.0, }, alignments={"trunk_stable": False, "stance_knee_extended": False}) r = score_hurdle_step(f) assert r.score >= 1 # ─── Inline Lunge ───────────────────────────────────────────────────────────── class TestInlineLunge: def test_score_3_deep_and_aligned(self): f = _make_features("inline_lunge", angles={ "front_knee_flexion_deg": 85.0, "trunk_lean_from_vertical_deg": 5.0, }, alignments={"trunk_upright": True, "knee_over_ankle": True}) r = score_inline_lunge(f) assert r.score == 3 def test_score_1_shallow(self): f = _make_features("inline_lunge", angles={ "front_knee_flexion_deg": 140.0, }, alignments={"trunk_upright": False, "knee_over_ankle": False}) r = score_inline_lunge(f) assert r.score == 1 # ─── Shoulder Mobility ──────────────────────────────────────────────────────── class TestShoulderMobility: def test_score_3_close(self): f = _make_features("shoulder_mobility", angles={ "inter_fist_normalized": 0.25, }, alignments={"fists_within_one_hand": True, "fists_within_1_5_hand": True}) r = score_shoulder_mobility(f) assert r.score == 3 def test_score_2_moderate(self): f = _make_features("shoulder_mobility", angles={ "inter_fist_normalized": 0.45, }, alignments={"fists_within_one_hand": False, "fists_within_1_5_hand": True}) r = score_shoulder_mobility(f) assert r.score == 2 def test_score_1_far(self): f = _make_features("shoulder_mobility", angles={ "inter_fist_normalized": 0.8, }, alignments={"fists_within_one_hand": False, "fists_within_1_5_hand": False}) r = score_shoulder_mobility(f) assert r.score == 1 # ─── Active SLR ─────────────────────────────────────────────────────────────── class TestActiveSLR: def test_score_3_high_raise(self): f = _make_features("active_slr", angles={ "raised_leg_angle_deg": 80.0, }, alignments={"past_contralateral_knee": True, "past_mid_thigh": True, "down_leg_flat": True}) r = score_active_slr(f) assert r.score == 3 def test_score_2_moderate_raise(self): f = _make_features("active_slr", angles={ "raised_leg_angle_deg": 55.0, }, alignments={"past_contralateral_knee": False, "past_mid_thigh": True, "down_leg_flat": True}) r = score_active_slr(f) assert r.score == 2 def test_score_1_low_raise(self): f = _make_features("active_slr", angles={ "raised_leg_angle_deg": 30.0, }, alignments={"past_contralateral_knee": False, "past_mid_thigh": False, "down_leg_flat": True}) r = score_active_slr(f) assert r.score == 1 # ─── Trunk Stability Push-Up ───────────────────────────────────────────────── class TestTrunkStabilityPushup: def test_score_3_rigid_hands_high(self): f = _make_features("trunk_stability_pushup", angles={ "max_sag_px": 10.0, "trunk_variance_px": 5.0, }, alignments={"body_rigid": True, "no_sag": True, "hands_at_forehead": True}) r = score_trunk_stability_pushup(f) assert r.score == 3 def test_score_1_sag(self): f = _make_features("trunk_stability_pushup", angles={ "max_sag_px": 50.0, "trunk_variance_px": 25.0, }, alignments={"body_rigid": False, "no_sag": False, "hands_at_forehead": True}) r = score_trunk_stability_pushup(f) assert r.score == 1 # ─── Rotary Stability ──────────────────────────────────────────────────────── class TestRotaryStability: def test_score_2_stable(self): f = _make_features("rotary_stability", angles={ "trunk_stability_std_px": 8.0, "shoulder_level_diff_px": 10.0, "hip_level_diff_px": 12.0, }, alignments={"trunk_stable": True, "shoulders_level": True, "hips_level": True}) r = score_rotary_stability(f) assert r.score == 2 # Default to 2 (contralateral assumption) def test_score_1_unstable(self): f = _make_features("rotary_stability", angles={ "trunk_stability_std_px": 30.0, "shoulder_level_diff_px": 35.0, "hip_level_diff_px": 30.0, }, alignments={"trunk_stable": False, "shoulders_level": False, "hips_level": False}) r = score_rotary_stability(f) assert r.score == 1 # ─── JudgeAgent fallback ───────────────────────────────────────────────────── class TestJudgeAgent: def test_fallback_when_judge_disabled(self, monkeypatch): """When ENABLE_JUDGE=False, judge promotes rubric score.""" from formscout import config monkeypatch.setattr(config, "ENABLE_JUDGE", False) agent = JudgeAgent() features = _make_features("deep_squat", angles={"left_femur_from_horizontal_deg": 70.0}) rubric = ScoreResult(score=3, rationale="all good", confidence=0.9) movement = MovementResult(test_name="deep_squat", side="na", confidence=1.0) result = agent.run(features, rubric, movement) assert isinstance(result, JudgeResult) assert result.score == 3 assert "[rubric-only]" in result.rationale def test_fallback_when_server_unavailable(self, monkeypatch): """ENABLE_JUDGE=True but llama-server down → rubric fallback, never a crash.""" from unittest.mock import PropertyMock, patch from formscout import config monkeypatch.setattr(config, "ENABLE_JUDGE", True) agent = JudgeAgent() with patch.object(type(agent._client), "available", new_callable=PropertyMock, return_value=False): features = _make_features("deep_squat") rubric = ScoreResult(score=2, rationale="heels up", confidence=0.8) movement = MovementResult(test_name="deep_squat", side="na", confidence=1.0) result = agent.run(features, rubric, movement) assert result.score == 2 assert "[rubric-only]" in result.rationale def test_vlm_response_parsed_into_judge_result(self, monkeypatch): """ENABLE_JUDGE=True with live client → VLM JSON becomes JudgeResult.""" from unittest.mock import PropertyMock, patch from formscout import config monkeypatch.setattr(config, "ENABLE_JUDGE", True) agent = JudgeAgent() vlm_json = { "test": "deep_squat", "side": "na", "score": 2, "needs_human": False, "rationale": "Femur 5° above horizontal; 2D estimate.", "compensation_tags": ["forward_lean"], "corrective_hint": "Sit back into heels.", "confidence": 0.78, } with patch.object(type(agent._client), "available", new_callable=PropertyMock, return_value=True), \ patch.object(agent._client, "complete", return_value=vlm_json): features = _make_features("deep_squat") rubric = ScoreResult(score=2, rationale="ok", confidence=0.8) movement = MovementResult(test_name="deep_squat", side="na", confidence=1.0) result = agent.run(features, rubric, movement) assert result.score == 2 assert result.compensation_tags == ["forward_lean"] assert result.needs_human is False def test_vlm_needs_human_yields_no_score(self, monkeypatch): """needs_human=True from the VLM must produce score=None.""" from unittest.mock import PropertyMock, patch from formscout import config monkeypatch.setattr(config, "ENABLE_JUDGE", True) agent = JudgeAgent() vlm_json = {"score": 1, "needs_human": True, "rationale": "Possible pain.", "confidence": 0.9} with patch.object(type(agent._client), "available", new_callable=PropertyMock, return_value=True), \ patch.object(agent._client, "complete", return_value=vlm_json): result = agent.run( _make_features("deep_squat"), ScoreResult(score=1, rationale="x", confidence=0.5), MovementResult(test_name="deep_squat", side="na", confidence=1.0), ) assert result.needs_human is True assert result.score is None # ─── LlamaCppClient (chat-completions endpoint) ────────────────────────────── class TestLlamaCppClient: def test_parse_plain_json(self): from formscout.serving.llama_cpp import LlamaCppClient assert LlamaCppClient._parse_json_reply('{"score": 3}') == {"score": 3} def test_parse_fenced_json(self): from formscout.serving.llama_cpp import LlamaCppClient fenced = '```json\n{"score": 2, "needs_human": false}\n```' assert LlamaCppClient._parse_json_reply(fenced) == {"score": 2, "needs_human": False} def test_parse_non_json_returns_text(self): from formscout.serving.llama_cpp import LlamaCppClient assert LlamaCppClient._parse_json_reply("not json") == {"text": "not json"} def test_complete_posts_chat_endpoint_with_images(self): from unittest.mock import MagicMock, patch from formscout.serving.llama_cpp import LlamaCppClient client = LlamaCppClient(port=8080) resp = MagicMock() resp.json.return_value = {"choices": [{"message": {"content": '{"ok": true}'}}]} resp.raise_for_status.return_value = None with patch("formscout.serving.llama_cpp.requests.post", return_value=resp) as mock_post: result = client.complete("score this", images=["aGVsbG8=" * 600]) assert result == {"ok": True} url = mock_post.call_args.args[0] if mock_post.call_args.args else mock_post.call_args.kwargs.get("url") assert url.endswith("/v1/chat/completions") payload = mock_post.call_args.kwargs["json"] content = payload["messages"][0]["content"] assert content[0] == {"type": "text", "text": "score this"} assert content[1]["type"] == "image_url" assert content[1]["image_url"]["url"].startswith("data:image/jpeg;base64,") def test_complete_connection_error_returns_safe_dict(self): from unittest.mock import patch import requests as _requests from formscout.serving.llama_cpp import LlamaCppClient client = LlamaCppClient(port=8080) with patch("formscout.serving.llama_cpp.requests.post", side_effect=_requests.ConnectionError): result = client.complete("hello") assert "error" in result # ─── ReportAgent ────────────────────────────────────────────────────────────── class TestReportAgent: def test_single_test_report(self): agent = ReportAgent() entries = [{ "movement": MovementResult(test_name="deep_squat", side="na", confidence=1.0), "features": _make_features("deep_squat"), "rubric_score": ScoreResult(score=3, rationale="ok", confidence=0.9), "judge": JudgeResult( score=3, rationale="good", compensation_tags=[], corrective_hint="", confidence=0.9, ), "side": "na", }] result = agent.run(entries) assert isinstance(result, ReportResult) assert len(result.per_test) == 1 assert result.per_test[0]["score"] == 3 def test_bilateral_reports_lower_score(self): agent = ReportAgent() entries = [ { "movement": MovementResult(test_name="hurdle_step", side="left", confidence=1.0), "features": _make_features("hurdle_step", side="left"), "rubric_score": ScoreResult(score=3, rationale="ok", confidence=0.9), "judge": JudgeResult( score=3, rationale="", compensation_tags=[], corrective_hint="", confidence=0.9, ), "side": "left", }, { "movement": MovementResult(test_name="hurdle_step", side="right", confidence=1.0), "features": _make_features("hurdle_step", side="right"), "rubric_score": ScoreResult(score=2, rationale="comp", confidence=0.8), "judge": JudgeResult( score=2, rationale="", compensation_tags=[], corrective_hint="", confidence=0.8, ), "side": "right", }, ] result = agent.run(entries) assert result.per_test[0]["score"] == 2 # lower of 3 and 2 assert len(result.asymmetries) == 1 assert result.asymmetries[0]["delta"] == 1 def test_bilateral_na_side_is_scored(self): """A bilateral test run once with side='na' (the UI default) must still count toward the composite, not be dropped as unscored.""" agent = ReportAgent() entries = [{ "movement": MovementResult(test_name="hurdle_step", side="na", confidence=1.0), "features": _make_features("hurdle_step", side="na"), "rubric_score": ScoreResult(score=2, rationale="", confidence=0.9), "judge": JudgeResult( score=2, rationale="", compensation_tags=[], corrective_hint="", confidence=0.9, ), "side": "na", }] result = agent.run(entries) assert len(result.per_test) == 1 assert result.per_test[0]["score"] == 2 assert result.composite == 2 assert result.asymmetries == [] # no L/R pair → no asymmetry def test_composite_none_when_unscored(self): agent = ReportAgent() entries = [{ "movement": MovementResult(test_name="deep_squat", side="na", confidence=1.0), "features": _make_features("deep_squat"), "rubric_score": ScoreResult(score=1, rationale="", confidence=0.5), "judge": JudgeResult( score=None, rationale="pain", compensation_tags=[], corrective_hint="", confidence=0.0, needs_human=True, ), "side": "na", }] result = agent.run(entries) assert result.composite is None