Pozify / tests /test_exercise_classifier.py
tiena2cva's picture
refactor: remove sample_pose_cache module and streamline pose processing in pipeline
bb897eb
Raw
History Blame Contribute Delete
22.4 kB
from __future__ import annotations
import json
import math
import os
from pathlib import Path
import sys
import tempfile
import unittest
from unittest.mock import patch
import joblib
import numpy as np
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
from pozify.contracts import PoseFrame, PoseSequence, UserProfile, validate_contract
from pozify.ml.exercise_router_evaluation import (
evaluate_router_predictions,
select_router_candidate,
)
from pozify.ml.exercise_router_features import (
FEATURE_SCHEMA,
ROUTER_LABELS,
ROUTER_LANDMARK_SCHEMA,
extract_router_windows,
window_tensor_feature_names,
window_vector_feature_names,
)
from pozify.ml.exercise_router_inference import (
DEFAULT_HF_REPO_ID,
HF_DISABLE_ENV,
RouterModelBundle,
WindowRouterPrediction,
aggregate_window_predictions,
load_router_model,
load_router_model_from_hf,
)
from pozify.steps import exercise_classifier
from pozify.steps.pose_backends.landmarks import LANDMARK_NAMES
def _profile(intended_exercise: str = "auto") -> UserProfile:
return UserProfile(
goal="beginner_practice",
experience_level="beginner",
intended_exercise=intended_exercise,
intended_variation=None,
known_limitations=[],
equipment="bodyweight",
)
def _base_landmarks(visibility: float) -> dict[str, dict[str, float]]:
return {
name: {
"x": 0.5,
"y": 0.5,
"z": 0.0,
"visibility": visibility,
"normalized_x": 0.0,
"normalized_y": 0.0,
"normalized_z": 0.0,
}
for name in LANDMARK_NAMES
}
def _set_landmark(
landmarks: dict[str, dict[str, float]],
name: str,
x: float,
y: float,
visibility: float,
) -> None:
landmarks[name].update(
{
"x": x,
"y": y,
"z": 0.0,
"visibility": visibility,
"normalized_x": x - 0.5,
"normalized_y": y - 0.5,
"normalized_z": 0.0,
}
)
def _landmarks_for_exercise(
exercise: str, phase: float, visibility: float
) -> dict[str, dict[str, float]]:
landmarks = _base_landmarks(visibility)
wave = (1.0 - math.cos(2.0 * math.pi * phase)) / 2.0
if exercise == "squat":
_set_landmark(landmarks, "left_shoulder", 0.42, 0.3 + wave * 0.04, visibility)
_set_landmark(landmarks, "right_shoulder", 0.58, 0.3 + wave * 0.04, visibility)
_set_landmark(landmarks, "left_hip", 0.43, 0.52 + wave * 0.16, visibility)
_set_landmark(landmarks, "right_hip", 0.57, 0.52 + wave * 0.16, visibility)
_set_landmark(landmarks, "left_knee", 0.42 + wave * 0.05, 0.72, visibility)
_set_landmark(landmarks, "right_knee", 0.58 - wave * 0.05, 0.72, visibility)
_set_landmark(landmarks, "left_ankle", 0.41, 0.92, visibility)
_set_landmark(landmarks, "right_ankle", 0.59, 0.92, visibility)
elif exercise == "shoulder_press":
_set_landmark(landmarks, "left_shoulder", 0.42, 0.42, visibility)
_set_landmark(landmarks, "right_shoulder", 0.58, 0.42, visibility)
_set_landmark(landmarks, "left_hip", 0.43, 0.7, visibility)
_set_landmark(landmarks, "right_hip", 0.57, 0.7, visibility)
_set_landmark(landmarks, "left_elbow", 0.4 - wave * 0.05, 0.62 - wave * 0.12, visibility)
_set_landmark(landmarks, "right_elbow", 0.6 + wave * 0.05, 0.62 - wave * 0.12, visibility)
_set_landmark(landmarks, "left_wrist", 0.4, 0.78 - wave * 0.34, visibility)
_set_landmark(landmarks, "right_wrist", 0.6, 0.78 - wave * 0.34, visibility)
else:
_set_landmark(landmarks, "left_shoulder", 0.3, 0.38 + wave * 0.14, visibility)
_set_landmark(landmarks, "right_shoulder", 0.7, 0.38 + wave * 0.14, visibility)
_set_landmark(landmarks, "left_elbow", 0.36, 0.47 + wave * 0.08, visibility)
_set_landmark(landmarks, "right_elbow", 0.64, 0.47 + wave * 0.08, visibility)
_set_landmark(landmarks, "left_wrist", 0.34, 0.52, visibility)
_set_landmark(landmarks, "right_wrist", 0.66, 0.52, visibility)
_set_landmark(landmarks, "left_hip", 0.42, 0.48 + wave * 0.14, visibility)
_set_landmark(landmarks, "right_hip", 0.58, 0.48 + wave * 0.14, visibility)
_set_landmark(landmarks, "left_ankle", 0.44, 0.56 + wave * 0.14, visibility)
_set_landmark(landmarks, "right_ankle", 0.56, 0.56 + wave * 0.14, visibility)
return landmarks
def _sequence(
exercise: str = "push_up", frame_count: int = 45, visibility: float = 0.95
) -> PoseSequence:
frames = [
PoseFrame(
frame_index=index,
timestamp_sec=round(index / 30.0, 3),
landmarks=_landmarks_for_exercise(exercise, index / 24.0, visibility),
world_landmarks={},
pose_quality={"mean_visibility": visibility, "normalized": True},
)
for index in range(frame_count)
]
return PoseSequence(
frames=frames,
normalized=True,
smoothing_method="exponential_smoothing",
pose_valid_ratio=1.0 if visibility >= 0.2 else 0.4,
)
def _static_sequence(frame_count: int = 45, visibility: float = 0.95) -> PoseSequence:
frames = [
PoseFrame(
frame_index=index,
timestamp_sec=round(index / 30.0, 3),
landmarks=_landmarks_for_exercise("push_up", 0.0, visibility),
world_landmarks={},
pose_quality={"mean_visibility": visibility, "normalized": True},
)
for index in range(frame_count)
]
return PoseSequence(
frames=frames,
normalized=True,
smoothing_method="exponential_smoothing",
pose_valid_ratio=1.0 if visibility >= 0.2 else 0.4,
)
class _FakePushUpModel:
classes_ = np.asarray(ROUTER_LABELS)
def predict_proba(self, values: np.ndarray) -> np.ndarray:
return np.tile(np.asarray([[0.03, 0.91, 0.02, 0.04]]), (values.shape[0], 1))
def _fake_router_artifact() -> dict[str, object]:
return {
"model": _FakePushUpModel(),
"labels": list(ROUTER_LABELS),
"model_kind": "baseline",
"feature_schema": FEATURE_SCHEMA,
"landmark_schema": ROUTER_LANDMARK_SCHEMA,
"input_size": len(window_vector_feature_names()),
}
class ExerciseRouterFeatureTests(unittest.TestCase):
def test_extracts_windows_for_supported_exercise_motion(self) -> None:
for exercise in ("squat", "push_up", "shoulder_press"):
with self.subTest(exercise=exercise):
windows = extract_router_windows(_sequence(exercise))
self.assertGreaterEqual(len(windows), 2)
self.assertEqual(windows[0].tensor.shape, (30, len(window_tensor_feature_names())))
self.assertEqual(windows[0].vector.shape, (len(window_vector_feature_names()),))
self.assertGreater(windows[0].mean_visibility, 0.9)
def test_empty_and_low_visibility_sequences_do_not_produce_windows(self) -> None:
empty = PoseSequence(
frames=[],
normalized=True,
smoothing_method="none",
pose_valid_ratio=0.0,
)
self.assertEqual(extract_router_windows(empty), [])
self.assertEqual(extract_router_windows(_sequence(visibility=0.05)), [])
def test_missing_landmark_is_zero_in_router_features(self) -> None:
sequence = _sequence("push_up")
frame = sequence.frames[0]
frame.landmarks.pop("nose")
windows = extract_router_windows(sequence, min_mean_visibility=0.0)
self.assertGreater(len(windows), 0)
nose_visibility_index = window_tensor_feature_names().index("nose_visibility")
self.assertEqual(windows[0].tensor[0, nose_visibility_index], 0.0)
def test_landmarks_without_visibility_still_produce_router_windows(self) -> None:
sequence = _sequence("push_up")
frames = [
PoseFrame(
frame.frame_index,
frame.timestamp_sec,
{
name: {key: value for key, value in landmark.items() if key != "visibility"}
for name, landmark in frame.landmarks.items()
},
frame.world_landmarks,
frame.pose_quality,
)
for frame in sequence.frames
]
no_visibility_sequence = PoseSequence(
frames=frames,
normalized=sequence.normalized,
smoothing_method=sequence.smoothing_method,
pose_valid_ratio=sequence.pose_valid_ratio,
)
windows = extract_router_windows(no_visibility_sequence)
self.assertGreater(len(windows), 0)
class ExerciseRouterAggregationTests(unittest.TestCase):
def test_aggregates_confident_windows(self) -> None:
predictions = [
WindowRouterPrediction(
0.0,
1.0,
"push_up",
0.91,
{"squat": 0.03, "push_up": 0.91, "shoulder_press": 0.02, "unknown": 0.04},
),
WindowRouterPrediction(
0.5,
1.5,
"push_up",
0.88,
{"squat": 0.05, "push_up": 0.88, "shoulder_press": 0.02, "unknown": 0.05},
),
]
aggregated = aggregate_window_predictions(predictions)
self.assertEqual(aggregated.label, "push_up")
self.assertFalse(aggregated.fallback_required)
self.assertGreaterEqual(aggregated.confidence, 0.85)
def test_low_confidence_and_inconsistent_windows_fallback_to_unknown(self) -> None:
low_confidence = [
WindowRouterPrediction(
0.0,
1.0,
"push_up",
0.45,
{"squat": 0.25, "push_up": 0.45, "shoulder_press": 0.15, "unknown": 0.15},
),
WindowRouterPrediction(
0.5,
1.5,
"push_up",
0.48,
{"squat": 0.22, "push_up": 0.48, "shoulder_press": 0.15, "unknown": 0.15},
),
]
inconsistent = [
WindowRouterPrediction(
0.0,
1.0,
"push_up",
0.8,
{"squat": 0.05, "push_up": 0.8, "shoulder_press": 0.1, "unknown": 0.05},
),
WindowRouterPrediction(
0.5,
1.5,
"squat",
0.8,
{"squat": 0.8, "push_up": 0.05, "shoulder_press": 0.1, "unknown": 0.05},
),
WindowRouterPrediction(
1.0,
2.0,
"shoulder_press",
0.8,
{"squat": 0.1, "push_up": 0.05, "shoulder_press": 0.8, "unknown": 0.05},
),
]
self.assertTrue(aggregate_window_predictions(low_confidence).fallback_required)
self.assertEqual(aggregate_window_predictions(low_confidence).label, "unknown")
self.assertTrue(aggregate_window_predictions(inconsistent).fallback_required)
self.assertEqual(aggregate_window_predictions(inconsistent).label, "unknown")
class ExerciseRouterModelLoadingTests(unittest.TestCase):
def test_selection_file_controls_active_artifact(self) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
model_dir = Path(temp_dir)
artifact_path = model_dir / "selected.joblib"
joblib.dump(_fake_router_artifact(), artifact_path)
(model_dir / "router_selection.json").write_text(
json.dumps({"selected_artifact": artifact_path.name}),
encoding="utf-8",
)
with patch.dict(os.environ, {HF_DISABLE_ENV: "1"}):
bundle = load_router_model(model_dir)
self.assertIsNotNone(bundle)
assert bundle is not None
self.assertEqual(bundle.labels, tuple(ROUTER_LABELS))
result = exercise_classifier.run(
_sequence("push_up"),
_profile("auto"),
mock=False,
model_bundle=bundle,
)
self.assertEqual(result.exercise, "push_up")
self.assertFalse(result.fallback_required)
def test_selection_file_accepts_legacy_selected_model_field(self) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
model_dir = Path(temp_dir)
artifact_path = model_dir / "selected.joblib"
joblib.dump(_fake_router_artifact(), artifact_path)
(model_dir / "router_selection.json").write_text(
json.dumps({"selected_model": artifact_path.name}),
encoding="utf-8",
)
with patch.dict(os.environ, {HF_DISABLE_ENV: "1"}):
bundle = load_router_model(model_dir)
self.assertIsNotNone(bundle)
assert bundle is not None
self.assertEqual(bundle.labels, tuple(ROUTER_LABELS))
def test_hf_loader_honors_selection_file(self) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
model_dir = Path(temp_dir)
artifact_path = model_dir / "router.joblib"
selection_path = model_dir / "router_selection.json"
joblib.dump(_fake_router_artifact(), artifact_path)
selection_path.write_text(
json.dumps({"selected_artifact": artifact_path.name}),
encoding="utf-8",
)
def fake_download(*, repo_id: str, filename: str, revision: str | None) -> Path:
self.assertEqual(repo_id, "owner/pozify-router")
self.assertEqual(revision, "main")
return {"router_selection.json": selection_path, "router.joblib": artifact_path}[
filename
]
with patch(
"pozify.ml.exercise_router_inference._hf_hub_download",
side_effect=fake_download,
):
bundle = load_router_model_from_hf("owner/pozify-router", revision="main")
self.assertIsNotNone(bundle)
assert bundle is not None
self.assertEqual(bundle.model_kind, "baseline")
self.assertEqual(bundle.labels, tuple(ROUTER_LABELS))
def test_hf_loader_defaults_to_pozify_router_repo(self) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
model_dir = Path(temp_dir)
artifact_path = model_dir / "router.joblib"
selection_path = model_dir / "router_selection.json"
joblib.dump(_fake_router_artifact(), artifact_path)
selection_path.write_text(
json.dumps({"selected_artifact": artifact_path.name}),
encoding="utf-8",
)
def fake_download(*, repo_id: str, filename: str, revision: str | None) -> Path:
self.assertEqual(repo_id, DEFAULT_HF_REPO_ID)
self.assertIsNone(revision)
return {"router_selection.json": selection_path, "router.joblib": artifact_path}[
filename
]
with (
patch.dict(os.environ, {}, clear=True),
patch(
"pozify.ml.exercise_router_inference._hf_hub_download",
side_effect=fake_download,
),
):
bundle = load_router_model_from_hf()
self.assertIsNotNone(bundle)
assert bundle is not None
self.assertEqual(bundle.model_kind, "baseline")
self.assertEqual(bundle.labels, tuple(ROUTER_LABELS))
def test_hf_loader_reads_local_env_before_selecting_repo(self) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
root = Path(temp_dir)
model_dir = root / "model"
model_dir.mkdir()
artifact_path = model_dir / "router.joblib"
selection_path = model_dir / "router_selection.json"
joblib.dump(_fake_router_artifact(), artifact_path)
selection_path.write_text(
json.dumps({"selected_artifact": artifact_path.name}),
encoding="utf-8",
)
(root / ".env").write_text(
"POZIFY_ROUTER_HF_REPO_ID=owner/env-router\n",
encoding="utf-8",
)
original_cwd = Path.cwd()
def fake_download(*, repo_id: str, filename: str, revision: str | None) -> Path:
self.assertEqual(repo_id, "owner/env-router")
self.assertIsNone(revision)
return {"router_selection.json": selection_path, "router.joblib": artifact_path}[
filename
]
try:
os.chdir(root)
with (
patch.dict(os.environ, {}, clear=True),
patch(
"pozify.ml.exercise_router_inference._hf_hub_download",
side_effect=fake_download,
),
):
bundle = load_router_model_from_hf()
finally:
os.chdir(original_cwd)
self.assertIsNotNone(bundle)
assert bundle is not None
self.assertEqual(bundle.labels, tuple(ROUTER_LABELS))
def test_old_router_artifact_without_schema_is_rejected(self) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
model_dir = Path(temp_dir)
joblib.dump(
{
"model": _FakePushUpModel(),
"labels": list(ROUTER_LABELS),
"model_kind": "baseline",
},
model_dir / "router.joblib",
)
with patch.dict(os.environ, {HF_DISABLE_ENV: "1"}):
with self.assertRaises(ValueError):
load_router_model(model_dir)
class ExerciseClassifierStepTests(unittest.TestCase):
def test_manual_override_bypasses_model_and_validates_contract(self) -> None:
result = exercise_classifier.run(
_sequence("squat"),
_profile("push_up"),
mock=False,
)
self.assertEqual(result.exercise, "push_up")
self.assertFalse(result.fallback_required)
self.assertEqual(result.confidence, 0.98)
validate_contract("exercise_classification.json", result)
def test_missing_model_uses_pose_heuristic_for_clear_motion(self) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
with patch.dict(os.environ, {HF_DISABLE_ENV: "1"}):
result = exercise_classifier.run(
_sequence("push_up"),
_profile("auto"),
mock=False,
model_dir=Path(temp_dir),
)
self.assertEqual(result.exercise, "push_up")
self.assertFalse(result.fallback_required)
self.assertGreaterEqual(result.confidence, 0.65)
validate_contract("exercise_classification.json", result)
def test_missing_model_keeps_static_motion_unknown(self) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
with patch.dict(os.environ, {HF_DISABLE_ENV: "1"}):
result = exercise_classifier.run(
_static_sequence(),
_profile("auto"),
mock=False,
model_dir=Path(temp_dir),
)
self.assertEqual(result.exercise, "unknown")
self.assertTrue(result.fallback_required)
validate_contract("exercise_classification.json", result)
def test_fake_model_routes_and_persists_window_predictions(self) -> None:
result = exercise_classifier.run(
_sequence("push_up"),
_profile("auto"),
mock=False,
model_bundle=RouterModelBundle(model=_FakePushUpModel(), labels=ROUTER_LABELS),
)
self.assertEqual(result.exercise, "push_up")
self.assertFalse(result.fallback_required)
self.assertGreater(result.confidence, 0.9)
self.assertGreater(len(result.window_predictions), 0)
self.assertEqual(
sorted(result.window_predictions[0]),
["confidence", "end_sec", "label", "start_sec"],
)
validate_contract("exercise_classification.json", result)
def test_low_pose_valid_ratio_falls_back_before_model_inference(self) -> None:
sequence = _sequence("push_up", visibility=0.95)
sequence = PoseSequence(
frames=sequence.frames,
normalized=sequence.normalized,
smoothing_method=sequence.smoothing_method,
pose_valid_ratio=0.4,
)
result = exercise_classifier.run(
sequence,
_profile("auto"),
mock=False,
model_bundle=RouterModelBundle(model=_FakePushUpModel(), labels=ROUTER_LABELS),
)
self.assertEqual(result.exercise, "unknown")
self.assertTrue(result.fallback_required)
class ExerciseRouterEvaluationTests(unittest.TestCase):
def test_evaluation_reports_accuracy_and_confusion_matrix(self) -> None:
evaluation = evaluate_router_predictions(
["squat", "push_up", "shoulder_press", "unknown"],
["squat", "push_up", "unknown", "unknown"],
)
self.assertEqual(evaluation.accuracy, 0.75)
self.assertEqual(evaluation.confusion_matrix["shoulder_press"]["unknown"], 1)
self.assertEqual(evaluation.unknown_rejection_rate, 1.0)
def test_prefers_temporal_when_available(self) -> None:
baseline = {"name": "baseline", "accuracy": 0.91, "unknown_rejection_rate": 0.8}
temporal = {"name": "temporal", "accuracy": 0.90, "unknown_rejection_rate": 0.7}
self.assertEqual(select_router_candidate([baseline, temporal]), temporal)
def test_selects_baseline_when_temporal_is_missing(self) -> None:
baseline = {"name": "baseline", "accuracy": 0.91, "unknown_rejection_rate": 0.8}
self.assertEqual(select_router_candidate([baseline]), baseline)
if __name__ == "__main__":
unittest.main()