Pozify / src /pozify /steps /exercise_classifier.py
tiena2cva's picture
refactor: remove sample_pose_cache module and streamline pose processing in pipeline
bb897eb
Raw
History Blame Contribute Delete
11.4 kB
from __future__ import annotations
from collections.abc import Iterable
from pathlib import Path
from pozify.contracts import ExerciseClassification, PoseFrame, PoseSequence, UserProfile
from pozify.exercise_catalog import DEFAULT_AUTO_EXERCISE
from pozify.hf_spaces import default_spaces_gpu_duration, spaces_gpu
from pozify.ml.exercise_router_features import RouterWindow, extract_router_windows
from pozify.ml.exercise_router_inference import (
MIN_POSE_VALID_RATIO,
RouterModelBundle,
aggregate_window_predictions,
contract_window_predictions,
load_router_model,
predict_window_probabilities,
window_predictions_from_scores,
)
from pozify.steps.rep_signals import angle_deg, smooth_signal
MIN_HEURISTIC_CONFIDENCE = 0.58
MIN_HEURISTIC_MARGIN = 0.14
def _sample_prediction_frames(sequence: PoseSequence, count: int = 4) -> list[object]:
if not sequence.frames:
return []
if len(sequence.frames) <= count:
return sequence.frames
last_index = len(sequence.frames) - 1
positions = sorted({round(index * last_index / (count - 1)) for index in range(count)})
return [sequence.frames[position] for position in positions]
def _fixed_classification(
sequence: PoseSequence,
*,
exercise: str,
confidence: float,
fallback_required: bool,
) -> ExerciseClassification:
return ExerciseClassification(
exercise=exercise, # type: ignore[arg-type]
confidence=confidence,
window_predictions=[
{
"start_sec": frame.timestamp_sec,
"end_sec": round(frame.timestamp_sec + 1.0, 3),
"label": exercise,
"confidence": confidence,
}
for frame in _sample_prediction_frames(sequence)
],
fallback_required=fallback_required,
)
def _manual_classification(sequence: PoseSequence, exercise: str) -> ExerciseClassification:
windows = extract_router_windows(sequence, min_mean_visibility=0.0)
if not windows:
return _fixed_classification(
sequence,
exercise=exercise,
confidence=0.98,
fallback_required=False,
)
return ExerciseClassification(
exercise=exercise, # type: ignore[arg-type]
confidence=0.98,
window_predictions=[
{
"start_sec": window.start_sec,
"end_sec": window.end_sec,
"label": exercise,
"confidence": 0.98,
}
for window in windows
],
fallback_required=False,
)
def _unknown_fallback(
window_predictions: list[dict[str, float | str]] | None = None,
*,
confidence: float = 0.0,
) -> ExerciseClassification:
return ExerciseClassification(
exercise="unknown",
confidence=round(confidence, 4),
window_predictions=window_predictions or [],
fallback_required=True,
)
def _clip01(value: float) -> float:
return max(0.0, min(1.0, value))
def _usable(values: Iterable[float | None]) -> list[float]:
return [float(value) for value in values if value is not None]
def _range(values: Iterable[float | None]) -> float:
usable_values = _usable(smooth_signal(list(values)))
if not usable_values:
return 0.0
return max(usable_values) - min(usable_values)
def _raw_axis(frame: PoseFrame, name: str, axis: str) -> float | None:
values = frame.landmarks.get(name) or frame.world_landmarks.get(name)
if values is None:
return None
value = values.get(
f"smoothed_{axis}",
values.get(axis, values.get(f"normalized_{axis}")),
)
return None if value is None else float(value)
def _mean_raw_axis(frame: PoseFrame, names: tuple[str, ...], axis: str) -> float | None:
values = _usable(_raw_axis(frame, name, axis) for name in names)
if not values:
return None
return sum(values) / len(values)
def _mean_abs_y_gap(
frames: list[PoseFrame],
first: tuple[str, ...],
second: tuple[str, ...],
) -> float:
gaps: list[float] = []
for frame in frames:
first_y = _mean_raw_axis(frame, first, "y")
second_y = _mean_raw_axis(frame, second, "y")
if first_y is None or second_y is None:
continue
gaps.append(abs(first_y - second_y))
return sum(gaps) / len(gaps) if gaps else 0.0
def _joint_bend_deg(frame: PoseFrame, triples: tuple[tuple[str, str, str], ...]) -> float | None:
values: list[float] = []
for first, middle, last in triples:
angle = angle_deg(frame, first, middle, last)
if angle is not None:
values.append(max(0.0, 180.0 - angle))
return sum(values) / len(values) if values else None
def _heuristic_score_rows(sequence: PoseSequence) -> dict[str, float]:
frames = [frame for frame in sequence.frames if frame.landmarks or frame.world_landmarks]
if len(frames) < 9:
return {"squat": 0.0, "push_up": 0.0, "shoulder_press": 0.0}
shoulder_y = [
_mean_raw_axis(frame, ("left_shoulder", "right_shoulder"), "y") for frame in frames
]
hip_y = [_mean_raw_axis(frame, ("left_hip", "right_hip"), "y") for frame in frames]
wrist_y = [_mean_raw_axis(frame, ("left_wrist", "right_wrist"), "y") for frame in frames]
knee_bend = [
_joint_bend_deg(
frame,
(
("left_hip", "left_knee", "left_ankle"),
("right_hip", "right_knee", "right_ankle"),
),
)
for frame in frames
]
hip_bend = [
_joint_bend_deg(
frame,
(
("left_shoulder", "left_hip", "left_knee"),
("right_shoulder", "right_hip", "right_knee"),
),
)
for frame in frames
]
elbow_bend = [
_joint_bend_deg(
frame,
(
("left_shoulder", "left_elbow", "left_wrist"),
("right_shoulder", "right_elbow", "right_wrist"),
),
)
for frame in frames
]
shoulder_range = _range(shoulder_y)
hip_range = _range(hip_y)
wrist_range = _range(wrist_y)
chest_range = (shoulder_range + hip_range) / 2.0
knee_bend_range = _range(knee_bend)
hip_bend_range = _range(hip_bend)
elbow_bend_range = _range(elbow_bend)
shoulder_hip_gap = _mean_abs_y_gap(
frames,
("left_shoulder", "right_shoulder"),
("left_hip", "right_hip"),
)
hip_ankle_gap = _mean_abs_y_gap(
frames,
("left_hip", "right_hip"),
("left_ankle", "right_ankle"),
)
standing_score = _clip01((shoulder_hip_gap + hip_ankle_gap) / 0.45)
plank_score = (
_clip01((0.28 - shoulder_hip_gap) / 0.28) + _clip01((0.24 - hip_ankle_gap) / 0.24)
) / 2.0
wrist_not_dominant = _clip01(1.0 - wrist_range / max(hip_range + 0.05, 0.05))
wrist_stable_for_push = _clip01(1.0 - wrist_range / max(chest_range + 0.04, 0.04))
body_still_for_press = _clip01(1.0 - max(shoulder_range, hip_range) / max(wrist_range, 0.05))
lower_body_bend = max(_clip01(knee_bend_range / 45.0), _clip01(hip_bend_range / 45.0))
push_up_geometry = 0.55 + 0.45 * plank_score
press_lower_body_gate = 1.0 - 0.55 * lower_body_bend
squat_score = standing_score * (
0.42 * _clip01(knee_bend_range / 45.0)
+ 0.24 * _clip01(hip_bend_range / 45.0)
+ 0.22 * _clip01(hip_range / 0.10)
+ 0.12 * wrist_not_dominant
)
push_up_score = (
0.45 * _clip01(elbow_bend_range / 55.0)
+ 0.25 * _clip01(chest_range / 0.08)
+ 0.20 * plank_score
+ 0.10 * wrist_stable_for_push
) * push_up_geometry
shoulder_press_score = press_lower_body_gate * (
0.40 * _clip01(wrist_range / 0.16)
+ 0.25 * _clip01(elbow_bend_range / 55.0)
+ 0.20 * body_still_for_press
+ 0.15 * standing_score
)
return {
"squat": round(_clip01(squat_score), 4),
"push_up": round(_clip01(push_up_score), 4),
"shoulder_press": round(_clip01(shoulder_press_score), 4),
}
def _heuristic_classification(
sequence: PoseSequence,
windows: list[RouterWindow],
) -> ExerciseClassification:
scores = _heuristic_score_rows(sequence)
ranked = sorted(scores, key=lambda label: scores[label], reverse=True)
winning_label = ranked[0]
score_margin = scores[winning_label] - scores[ranked[1]]
if scores[winning_label] < MIN_HEURISTIC_CONFIDENCE or score_margin < MIN_HEURISTIC_MARGIN:
return _unknown_fallback(confidence=scores[winning_label])
confidence = round(
min(0.88, 0.55 + scores[winning_label] * 0.25 + min(score_margin, 0.4) * 0.20),
4,
)
if not windows:
return _fixed_classification(
sequence,
exercise=winning_label,
confidence=confidence,
fallback_required=False,
)
return ExerciseClassification(
exercise=winning_label, # type: ignore[arg-type]
confidence=confidence,
window_predictions=[
{
"start_sec": window.start_sec,
"end_sec": window.end_sec,
"label": winning_label,
"confidence": confidence,
}
for window in windows
],
fallback_required=False,
)
def _gpu_duration(*_args: object, **_kwargs: object) -> int:
return default_spaces_gpu_duration()
@spaces_gpu(duration=_gpu_duration)
def _predict_router_scores(
windows: list[RouterWindow],
model_dir: str,
) -> list[dict[str, float]]:
bundle = load_router_model(Path(model_dir))
if bundle is None:
return []
return predict_window_probabilities(bundle, windows)
def run(
sequence: PoseSequence,
profile: UserProfile,
*,
mock: bool = False,
model_bundle: RouterModelBundle | None = None,
model_dir: Path | None = None,
) -> ExerciseClassification:
if profile.intended_exercise != "auto":
return _manual_classification(sequence, profile.intended_exercise)
if mock:
return _fixed_classification(
sequence,
exercise=DEFAULT_AUTO_EXERCISE,
confidence=0.92,
fallback_required=False,
)
windows = extract_router_windows(sequence)
if not windows or sequence.pose_valid_ratio < MIN_POSE_VALID_RATIO:
return _unknown_fallback()
try:
if model_bundle is not None:
score_rows = predict_window_probabilities(model_bundle, windows)
else:
score_rows = _predict_router_scores(
windows,
str(model_dir or Path("models/exercise_router/active")),
)
if not score_rows:
return _heuristic_classification(sequence, windows)
except Exception:
return _heuristic_classification(sequence, windows)
predictions = window_predictions_from_scores(windows, score_rows)
window_payload = contract_window_predictions(predictions)
aggregated = aggregate_window_predictions(predictions)
return ExerciseClassification(
exercise=aggregated.label, # type: ignore[arg-type]
confidence=aggregated.confidence,
window_predictions=window_payload,
fallback_required=aggregated.fallback_required,
)