Pozify / tests /test_pose_steps.py
tiena2cva's picture
refactor: remove sample_pose_cache module and streamline pose processing in pipeline
bb897eb
Raw
History Blame Contribute Delete
14.1 kB
from __future__ import annotations
# ruff: noqa: E402
import os
from pathlib import Path
from types import SimpleNamespace
import sys
import tempfile
import unittest
from unittest.mock import patch
import cv2
import numpy as np
FIXTURES_DIR = Path(__file__).resolve().parent / "fixtures"
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
from pozify.contracts import PoseFrame, PoseSequence, VideoManifest
from pozify.steps import pose_cleaning, pose_landmarker
from pozify.steps.pose_backends import (
PoseBackendUnavailableError,
PoseDetection,
landmark_list_to_dict,
)
from pozify.steps.pose_backends.mediapipe import MediaPipePoseBackend, _MediaPipeTasksPoseAdapter
def _landmark(x: float, y: float, z: float = 0.0, visibility: float = 0.9) -> SimpleNamespace:
return SimpleNamespace(x=x, y=y, z=z, visibility=visibility)
def _landmark_result(offset: float = 0.0) -> SimpleNamespace:
landmarks = [
_landmark(0.2 + offset + index * 0.001, 0.1 + index * 0.01, -0.01, 0.91)
for index in range(33)
]
return SimpleNamespace(
pose_landmarks=SimpleNamespace(landmark=landmarks),
pose_world_landmarks=SimpleNamespace(landmark=landmarks),
)
class _FakePose:
source = "fake_pose"
def __init__(self) -> None:
self.calls = 0
def __enter__(self) -> "_FakePose":
return self
def __exit__(self, *args: object) -> None:
return None
def detect(self, _frame: object, *, frame_index: int) -> PoseDetection:
self.calls += 1
result = _landmark_result(offset=self.calls * 0.01)
return PoseDetection(
landmarks=landmark_list_to_dict(result.pose_landmarks),
world_landmarks=landmark_list_to_dict(result.pose_world_landmarks),
source=self.source,
)
class PoseStepTests(unittest.TestCase):
def setUp(self) -> None:
self.temp_dir = tempfile.TemporaryDirectory()
def tearDown(self) -> None:
self.temp_dir.cleanup()
def _write_video(self, frame_count: int = 4) -> Path:
path = Path(self.temp_dir.name) / "pose.mp4"
writer = cv2.VideoWriter(str(path), cv2.VideoWriter_fourcc(*"mp4v"), 30.0, (640, 480))
self.assertTrue(writer.isOpened())
for frame_index in range(frame_count):
frame = np.full((480, 640, 3), 120 + frame_index, dtype=np.uint8)
writer.write(frame)
writer.release()
return path
def test_pose_landmarker_maps_to_coco17_landmarks_and_quality(self) -> None:
path = self._write_video()
manifest = VideoManifest(
video_path=str(path),
fps=30.0,
duration_sec=0.133,
total_frames=4,
sampled_frames=4,
width=640,
height=480,
codec="mp4v",
container="mp4",
brightness_mean=120.0,
blur_laplacian_var=100.0,
quality_warnings=[],
analysis_allowed=True,
)
sequence = pose_landmarker.run(manifest, backend=_FakePose())
self.assertEqual(len(sequence.frames), 4)
self.assertEqual(len(sequence.frames[0].landmarks), 17)
self.assertIn("left_ankle", sequence.frames[0].landmarks)
self.assertNotIn("left_foot_index", sequence.frames[0].landmarks)
self.assertEqual(len(sequence.frames[0].world_landmarks), 17)
self.assertEqual(sequence.frames[0].pose_quality["source"], "fake_pose")
self.assertEqual(sequence.frames[0].pose_quality["landmark_schema"], "coco17")
self.assertGreater(sequence.frames[0].pose_quality["mean_visibility"], 0.9)
self.assertTrue(sequence.frames[0].pose_quality["critical_landmarks_visible"])
self.assertEqual(sequence.pose_valid_ratio, 1.0)
def test_pose_landmarker_uses_dense_frames_for_real_backend(self) -> None:
path = self._write_video(frame_count=130)
manifest = VideoManifest(
video_path=str(path),
fps=30.0,
duration_sec=4.333,
total_frames=130,
sampled_frames=12,
width=640,
height=480,
codec="mp4v",
container="mp4",
brightness_mean=120.0,
blur_laplacian_var=100.0,
quality_warnings=[],
analysis_allowed=True,
)
sequence = pose_landmarker.run(manifest, backend=_FakePose())
self.assertEqual(len(sequence.frames), 130)
def test_pose_landmarker_returns_unavailable_sequence_for_missing_backend_libs(
self,
) -> None:
path = self._write_video()
manifest = VideoManifest(
video_path=str(path),
fps=30.0,
duration_sec=0.133,
total_frames=4,
sampled_frames=4,
width=640,
height=480,
codec="mp4v",
container="mp4",
brightness_mean=120.0,
blur_laplacian_var=100.0,
quality_warnings=[],
analysis_allowed=True,
)
with patch(
"pozify.steps.pose_landmarker.create_pose_backend",
side_effect=PoseBackendUnavailableError("missing libGLESv2.so.2"),
):
sequence = pose_landmarker.run(manifest, backend_name="mediapipe")
self.assertEqual(sequence.pose_valid_ratio, 0.0)
self.assertEqual(len(sequence.frames), 1)
self.assertEqual(
sequence.frames[0].pose_quality["pose_warning"],
"pose_backend_unavailable",
)
self.assertIn("libGLESv2", sequence.frames[0].pose_quality["reason"])
def test_dense_video_iteration_reads_sequentially_without_reseeking(self) -> None:
class FakeCapture:
def __init__(self) -> None:
self.set_calls: list[tuple[int, int]] = []
self.read_calls = 0
def isOpened(self) -> bool:
return True
def set(self, prop: int, value: int) -> None:
self.set_calls.append((prop, value))
def read(self) -> tuple[bool, object | None]:
if self.read_calls >= 3:
return False, None
self.read_calls += 1
return True, object()
def release(self) -> None:
return None
capture = FakeCapture()
manifest = VideoManifest(
video_path="fake.mp4",
fps=30.0,
duration_sec=0.1,
total_frames=3,
sampled_frames=3,
width=640,
height=480,
codec="mp4v",
container="mp4",
brightness_mean=120.0,
blur_laplacian_var=100.0,
quality_warnings=[],
analysis_allowed=True,
)
with patch("pozify.steps.pose_landmarker.cv2.VideoCapture", return_value=capture):
frames = list(pose_landmarker._iter_video_frames(manifest, sample_count=None))
self.assertEqual([frame_index for frame_index, _ in frames], [0, 1, 2])
seek_calls = [call for call in capture.set_calls if call[0] == cv2.CAP_PROP_POS_FRAMES]
self.assertEqual(seek_calls, [])
if hasattr(cv2, "CAP_PROP_ORIENTATION_AUTO"):
self.assertIn((cv2.CAP_PROP_ORIENTATION_AUTO, 1), capture.set_calls)
def test_pose_cleaning_interpolates_smooths_and_adds_normalized_fields(
self,
) -> None:
first_landmarks = _landmark_result(offset=0.0).pose_landmarks
last_landmarks = _landmark_result(offset=0.2).pose_landmarks
first = landmark_list_to_dict(first_landmarks)
last = landmark_list_to_dict(last_landmarks)
sequence = PoseSequence(
frames=[
PoseFrame(0, 0.0, first, first, {"mean_visibility": 0.9}),
PoseFrame(1, 0.033, {}, {}, {"mean_visibility": 0.0}),
PoseFrame(2, 0.067, last, last, {"mean_visibility": 0.9}),
],
normalized=False,
smoothing_method="none",
pose_valid_ratio=0.667,
)
cleaned = pose_cleaning.run(sequence)
self.assertTrue(cleaned.normalized)
self.assertEqual(cleaned.smoothing_method, "exponential_smoothing")
self.assertEqual(cleaned.pose_valid_ratio, 1.0)
self.assertTrue(cleaned.frames[1].pose_quality["interpolated"])
shoulder = cleaned.frames[1].landmarks["left_shoulder"]
self.assertIn("x", shoulder)
self.assertIn("smoothed_x", shoulder)
self.assertIn("normalized_x", shoulder)
self.assertTrue(cleaned.frames[1].pose_quality["normalized"])
self.assertIn("smoothed_x", cleaned.frames[1].world_landmarks["left_shoulder"])
self.assertIn("normalized_z", cleaned.frames[1].world_landmarks["left_shoulder"])
@unittest.skipUnless(
os.getenv("POZIFY_RUN_REAL_POSE_TESTS") == "1",
"set POZIFY_RUN_REAL_POSE_TESTS=1 to run the real MediaPipe fixture smoke test",
)
def test_real_sample_mov_extracts_pose_landmarks(self) -> None:
path = FIXTURES_DIR / "sample.MOV"
self.assertTrue(path.exists(), path)
from pozify.steps import video_qc
manifest = video_qc.run(str(path))
sequence = pose_landmarker.run(manifest, mock=False, backend_name="mediapipe")
cleaned = pose_cleaning.run(sequence)
self.assertTrue(manifest.analysis_allowed)
self.assertGreater(len(sequence.frames), 0)
self.assertGreater(sequence.pose_valid_ratio, 0.0)
self.assertEqual(len(sequence.frames[0].landmarks), 17)
self.assertEqual(sequence.frames[0].pose_quality["source"], "mediapipe_pose")
self.assertTrue(cleaned.normalized)
class MediaPipeTasksDelegateTests(unittest.TestCase):
def _fake_mediapipe(self, *, fail_gpu: bool = False) -> SimpleNamespace:
class Delegate:
CPU = "cpu"
GPU = "gpu"
class BaseOptions:
def __init__(self, *, model_asset_path: str, delegate: str) -> None:
self.model_asset_path = model_asset_path
self.delegate = delegate
BaseOptions.Delegate = Delegate
class PoseLandmarkerOptions:
def __init__(self, *, base_options: BaseOptions, **_kwargs: object) -> None:
self.base_options = base_options
class PoseLandmarker:
@staticmethod
def create_from_options(options: PoseLandmarkerOptions) -> object:
if fail_gpu and options.base_options.delegate == "gpu":
raise RuntimeError("gpu delegate unavailable")
return SimpleNamespace(delegate=options.base_options.delegate)
return SimpleNamespace(
tasks=SimpleNamespace(
BaseOptions=BaseOptions,
vision=SimpleNamespace(
PoseLandmarkerOptions=PoseLandmarkerOptions,
PoseLandmarker=PoseLandmarker,
RunningMode=SimpleNamespace(IMAGE="image"),
),
)
)
def _adapter(self, fake_mediapipe: SimpleNamespace) -> _MediaPipeTasksPoseAdapter:
adapter = object.__new__(_MediaPipeTasksPoseAdapter)
adapter._mp = fake_mediapipe
return adapter
def test_mediapipe_tasks_prefers_cpu_outside_zero_gpu(self) -> None:
adapter = self._adapter(self._fake_mediapipe())
torch_without_cuda = SimpleNamespace(cuda=SimpleNamespace(is_available=lambda: False))
with (
patch.dict(os.environ, {}, clear=True),
patch.dict(sys.modules, {"torch": torch_without_cuda}),
):
self.assertEqual(adapter._preferred_delegate(), "cpu")
def test_mediapipe_tasks_defaults_to_cpu_inside_zero_gpu(self) -> None:
adapter = self._adapter(self._fake_mediapipe())
with patch.dict(os.environ, {"SPACES_ZERO_GPU": "1"}, clear=True):
self.assertEqual(adapter._preferred_delegate(), "cpu")
def test_mediapipe_tasks_auto_prefers_gpu_inside_zero_gpu(self) -> None:
adapter = self._adapter(self._fake_mediapipe())
with patch.dict(
os.environ,
{"SPACES_ZERO_GPU": "1", "POZIFY_MEDIAPIPE_DELEGATE": "auto"},
clear=True,
):
self.assertEqual(adapter._preferred_delegate(), "gpu")
def test_mediapipe_delegate_env_can_force_gpu(self) -> None:
adapter = self._adapter(self._fake_mediapipe())
with patch.dict(os.environ, {"POZIFY_MEDIAPIPE_DELEGATE": "gpu"}, clear=True):
self.assertEqual(adapter._preferred_delegate(), "gpu")
def test_mediapipe_tasks_falls_back_to_cpu_when_gpu_delegate_fails(self) -> None:
adapter = self._adapter(self._fake_mediapipe(fail_gpu=True))
with patch.dict(os.environ, {"SPACES_ZERO_GPU": "1"}, clear=True):
landmarker = adapter._create_landmarker(Path("pose.task"))
self.assertEqual(landmarker.delegate, "cpu")
def test_mediapipe_backend_prefers_tasks_when_legacy_solution_exists(self) -> None:
fake_mediapipe = self._fake_mediapipe()
class LegacyPose:
def __init__(self, **_kwargs: object) -> None:
raise AssertionError("legacy CPU-only pose solution should not be used")
fake_mediapipe.solutions = SimpleNamespace(
pose=SimpleNamespace(Pose=LegacyPose),
)
backend = object.__new__(MediaPipePoseBackend)
with (
patch.dict(os.environ, {}, clear=True),
patch.dict(sys.modules, {"mediapipe": fake_mediapipe}),
patch(
"pozify.steps.pose_backends.mediapipe._ensure_pose_task_model",
return_value=Path("pose.task"),
),
):
pose = backend._create_pose()
self.assertIsInstance(pose, _MediaPipeTasksPoseAdapter)
self.assertEqual(pose._landmarker.delegate, "cpu")
if __name__ == "__main__":
unittest.main()