Commit
·
7efd2e7
1
Parent(s):
05f99bb
enhanced logs around missing frames
Browse files- backend/gradio_labanmovementanalysis/json_generator.py +1 -0
- backend/gradio_labanmovementanalysis/labanmovementanalysis.py +9 -0
- backend/gradio_labanmovementanalysis/pose_estimation.py +54 -10
- backend/gradio_labanmovementanalysis/video_utils.py +3 -0
- backend/gradio_labanmovementanalysis/visualizer.py +84 -29
- demo/app.py +3 -1
- yolo11n-pose.pt +3 -0
- yolov8n-pose.pt +3 -0
backend/gradio_labanmovementanalysis/json_generator.py
CHANGED
|
@@ -8,6 +8,7 @@ from datetime import datetime
|
|
| 8 |
|
| 9 |
from .pose_estimation import PoseResult, Keypoint
|
| 10 |
from .notation_engine import MovementMetrics, Direction, Intensity, Speed
|
|
|
|
| 11 |
|
| 12 |
|
| 13 |
def generate_json(
|
|
|
|
| 8 |
|
| 9 |
from .pose_estimation import PoseResult, Keypoint
|
| 10 |
from .notation_engine import MovementMetrics, Direction, Intensity, Speed
|
| 11 |
+
import numpy as np
|
| 12 |
|
| 13 |
|
| 14 |
def generate_json(
|
backend/gradio_labanmovementanalysis/labanmovementanalysis.py
CHANGED
|
@@ -51,6 +51,7 @@ class LabanMovementAnalysis(Component):
|
|
| 51 |
elem_classes: Optional[List[str]] = None,
|
| 52 |
render: bool = True,
|
| 53 |
**kwargs):
|
|
|
|
| 54 |
"""
|
| 55 |
Initialize the Laban Movement Analysis component.
|
| 56 |
|
|
@@ -89,6 +90,7 @@ class LabanMovementAnalysis(Component):
|
|
| 89 |
# SkateFormer features reserved for Version 2
|
| 90 |
|
| 91 |
def preprocess(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
|
| 92 |
"""
|
| 93 |
Preprocess input from the frontend.
|
| 94 |
|
|
@@ -126,6 +128,7 @@ class LabanMovementAnalysis(Component):
|
|
| 126 |
return options
|
| 127 |
|
| 128 |
def postprocess(self, value: Any) -> Dict[str, Any]:
|
|
|
|
| 129 |
"""
|
| 130 |
Postprocess analysis results for the frontend.
|
| 131 |
|
|
@@ -153,6 +156,7 @@ class LabanMovementAnalysis(Component):
|
|
| 153 |
def process_video(self, video_input: Union[str, os.PathLike], model: str = DEFAULT_MODEL,
|
| 154 |
enable_visualization: bool = True,
|
| 155 |
include_keypoints: bool = False) -> Tuple[Dict[str, Any], Optional[str]]:
|
|
|
|
| 156 |
"""
|
| 157 |
Main processing function that performs pose analysis on a video.
|
| 158 |
|
|
@@ -266,6 +270,7 @@ class LabanMovementAnalysis(Component):
|
|
| 266 |
return json_output, visualization_path
|
| 267 |
|
| 268 |
def __call__(self, video_path: str, **kwargs) -> Tuple[Dict[str, Any], Optional[str]]:
|
|
|
|
| 269 |
"""
|
| 270 |
Make the component callable for easy use.
|
| 271 |
|
|
@@ -285,12 +290,14 @@ class LabanMovementAnalysis(Component):
|
|
| 285 |
# will be available in the next major release
|
| 286 |
|
| 287 |
def cleanup(self):
|
|
|
|
| 288 |
"""Clean up temporary files and resources."""
|
| 289 |
# Clean up video input handler
|
| 290 |
if hasattr(self, 'video_input'):
|
| 291 |
self.video_input.cleanup()
|
| 292 |
|
| 293 |
def example_payload(self) -> Dict[str, Any]:
|
|
|
|
| 294 |
"""Example input payload for documentation."""
|
| 295 |
return {
|
| 296 |
"video": {"path": "/path/to/video.mp4"},
|
|
@@ -300,6 +307,7 @@ class LabanMovementAnalysis(Component):
|
|
| 300 |
}
|
| 301 |
|
| 302 |
def example_value(self) -> Dict[str, Any]:
|
|
|
|
| 303 |
"""Example output value for documentation."""
|
| 304 |
return {
|
| 305 |
"json_output": {
|
|
@@ -348,6 +356,7 @@ class LabanMovementAnalysis(Component):
|
|
| 348 |
}
|
| 349 |
|
| 350 |
def api_info(self) -> Dict[str, Any]:
|
|
|
|
| 351 |
"""API information for the component."""
|
| 352 |
return {
|
| 353 |
"type": "composite",
|
|
|
|
| 51 |
elem_classes: Optional[List[str]] = None,
|
| 52 |
render: bool = True,
|
| 53 |
**kwargs):
|
| 54 |
+
print("[TRACE] LabanMovementAnalysis.__init__ called")
|
| 55 |
"""
|
| 56 |
Initialize the Laban Movement Analysis component.
|
| 57 |
|
|
|
|
| 90 |
# SkateFormer features reserved for Version 2
|
| 91 |
|
| 92 |
def preprocess(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
| 93 |
+
print("[TRACE] LabanMovementAnalysis.preprocess called")
|
| 94 |
"""
|
| 95 |
Preprocess input from the frontend.
|
| 96 |
|
|
|
|
| 128 |
return options
|
| 129 |
|
| 130 |
def postprocess(self, value: Any) -> Dict[str, Any]:
|
| 131 |
+
print("[TRACE] LabanMovementAnalysis.postprocess called")
|
| 132 |
"""
|
| 133 |
Postprocess analysis results for the frontend.
|
| 134 |
|
|
|
|
| 156 |
def process_video(self, video_input: Union[str, os.PathLike], model: str = DEFAULT_MODEL,
|
| 157 |
enable_visualization: bool = True,
|
| 158 |
include_keypoints: bool = False) -> Tuple[Dict[str, Any], Optional[str]]:
|
| 159 |
+
print(f"[TRACE] LabanMovementAnalysis.process_video called with model={model}, enable_visualization={enable_visualization}, include_keypoints={include_keypoints}")
|
| 160 |
"""
|
| 161 |
Main processing function that performs pose analysis on a video.
|
| 162 |
|
|
|
|
| 270 |
return json_output, visualization_path
|
| 271 |
|
| 272 |
def __call__(self, video_path: str, **kwargs) -> Tuple[Dict[str, Any], Optional[str]]:
|
| 273 |
+
print(f"[TRACE] LabanMovementAnalysis.__call__ called with video_path={video_path}")
|
| 274 |
"""
|
| 275 |
Make the component callable for easy use.
|
| 276 |
|
|
|
|
| 290 |
# will be available in the next major release
|
| 291 |
|
| 292 |
def cleanup(self):
|
| 293 |
+
print("[TRACE] LabanMovementAnalysis.cleanup called")
|
| 294 |
"""Clean up temporary files and resources."""
|
| 295 |
# Clean up video input handler
|
| 296 |
if hasattr(self, 'video_input'):
|
| 297 |
self.video_input.cleanup()
|
| 298 |
|
| 299 |
def example_payload(self) -> Dict[str, Any]:
|
| 300 |
+
print("[TRACE] LabanMovementAnalysis.example_payload called")
|
| 301 |
"""Example input payload for documentation."""
|
| 302 |
return {
|
| 303 |
"video": {"path": "/path/to/video.mp4"},
|
|
|
|
| 307 |
}
|
| 308 |
|
| 309 |
def example_value(self) -> Dict[str, Any]:
|
| 310 |
+
print("[TRACE] LabanMovementAnalysis.example_value called")
|
| 311 |
"""Example output value for documentation."""
|
| 312 |
return {
|
| 313 |
"json_output": {
|
|
|
|
| 356 |
}
|
| 357 |
|
| 358 |
def api_info(self) -> Dict[str, Any]:
|
| 359 |
+
print("[TRACE] LabanMovementAnalysis.api_info called")
|
| 360 |
"""API information for the component."""
|
| 361 |
return {
|
| 362 |
"type": "composite",
|
backend/gradio_labanmovementanalysis/pose_estimation.py
CHANGED
|
@@ -7,6 +7,7 @@ from abc import ABC, abstractmethod
|
|
| 7 |
import numpy as np
|
| 8 |
from typing import List, Dict, Optional, Tuple, Any
|
| 9 |
from dataclasses import dataclass
|
|
|
|
| 10 |
|
| 11 |
|
| 12 |
@dataclass
|
|
@@ -136,14 +137,19 @@ class MoveNetPoseEstimator(PoseEstimator):
|
|
| 136 |
# Convert to our format
|
| 137 |
keypoints = []
|
| 138 |
for i, (y, x, score) in enumerate(keypoints_with_scores):
|
|
|
|
|
|
|
|
|
|
| 139 |
keypoints.append(Keypoint(
|
| 140 |
x=float(x),
|
| 141 |
y=float(y),
|
| 142 |
confidence=float(score),
|
| 143 |
name=self.KEYPOINT_NAMES[i]
|
| 144 |
))
|
| 145 |
-
|
| 146 |
-
|
|
|
|
|
|
|
| 147 |
|
| 148 |
def get_keypoint_names(self) -> List[str]:
|
| 149 |
return self.KEYPOINT_NAMES.copy()
|
|
@@ -239,7 +245,7 @@ class YOLOPoseEstimator(PoseEstimator):
|
|
| 239 |
"left_knee", "right_knee", "left_ankle", "right_ankle"
|
| 240 |
]
|
| 241 |
|
| 242 |
-
def __init__(self, model_version: str = "v11", model_size: str = "n", confidence_threshold: float = 0.
|
| 243 |
"""
|
| 244 |
Initialize YOLO pose model.
|
| 245 |
|
|
@@ -276,7 +282,7 @@ class YOLOPoseEstimator(PoseEstimator):
|
|
| 276 |
self._load_model()
|
| 277 |
|
| 278 |
# Run inference
|
| 279 |
-
results = self.model(frame, conf=self.confidence_threshold)
|
| 280 |
|
| 281 |
pose_results = []
|
| 282 |
|
|
@@ -289,6 +295,9 @@ class YOLOPoseEstimator(PoseEstimator):
|
|
| 289 |
# YOLO returns keypoints as [x, y, conf]
|
| 290 |
height, width = frame.shape[:2]
|
| 291 |
for i, (x, y, conf) in enumerate(keypoints_data):
|
|
|
|
|
|
|
|
|
|
| 292 |
keypoints.append(Keypoint(
|
| 293 |
x=float(x) / width, # Normalize to 0-1
|
| 294 |
y=float(y) / height, # Normalize to 0-1
|
|
@@ -296,11 +305,12 @@ class YOLOPoseEstimator(PoseEstimator):
|
|
| 296 |
name=self.KEYPOINT_NAMES[i] if i < len(self.KEYPOINT_NAMES) else f"joint_{i}"
|
| 297 |
))
|
| 298 |
|
| 299 |
-
|
| 300 |
-
|
| 301 |
-
|
| 302 |
-
|
| 303 |
-
|
|
|
|
| 304 |
|
| 305 |
return pose_results
|
| 306 |
|
|
@@ -377,4 +387,38 @@ def get_pose_estimator(model_spec: str) -> PoseEstimator:
|
|
| 377 |
|
| 378 |
# Legacy format support
|
| 379 |
else:
|
| 380 |
-
return create_pose_estimator(model_spec)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 7 |
import numpy as np
|
| 8 |
from typing import List, Dict, Optional, Tuple, Any
|
| 9 |
from dataclasses import dataclass
|
| 10 |
+
import math
|
| 11 |
|
| 12 |
|
| 13 |
@dataclass
|
|
|
|
| 137 |
# Convert to our format
|
| 138 |
keypoints = []
|
| 139 |
for i, (y, x, score) in enumerate(keypoints_with_scores):
|
| 140 |
+
# Sanitize NaN values
|
| 141 |
+
if any(map(math.isnan, [x, y, score])):
|
| 142 |
+
continue
|
| 143 |
keypoints.append(Keypoint(
|
| 144 |
x=float(x),
|
| 145 |
y=float(y),
|
| 146 |
confidence=float(score),
|
| 147 |
name=self.KEYPOINT_NAMES[i]
|
| 148 |
))
|
| 149 |
+
if keypoints:
|
| 150 |
+
return [PoseResult(keypoints=keypoints, frame_index=0)]
|
| 151 |
+
else:
|
| 152 |
+
return []
|
| 153 |
|
| 154 |
def get_keypoint_names(self) -> List[str]:
|
| 155 |
return self.KEYPOINT_NAMES.copy()
|
|
|
|
| 245 |
"left_knee", "right_knee", "left_ankle", "right_ankle"
|
| 246 |
]
|
| 247 |
|
| 248 |
+
def __init__(self, model_version: str = "v11", model_size: str = "n", confidence_threshold: float = 0.15):
|
| 249 |
"""
|
| 250 |
Initialize YOLO pose model.
|
| 251 |
|
|
|
|
| 282 |
self._load_model()
|
| 283 |
|
| 284 |
# Run inference
|
| 285 |
+
results = self.model(frame, conf=self.confidence_threshold, iou=0.7)
|
| 286 |
|
| 287 |
pose_results = []
|
| 288 |
|
|
|
|
| 295 |
# YOLO returns keypoints as [x, y, conf]
|
| 296 |
height, width = frame.shape[:2]
|
| 297 |
for i, (x, y, conf) in enumerate(keypoints_data):
|
| 298 |
+
# Sanitize NaN values
|
| 299 |
+
if any(map(math.isnan, [x, y, conf])):
|
| 300 |
+
continue
|
| 301 |
keypoints.append(Keypoint(
|
| 302 |
x=float(x) / width, # Normalize to 0-1
|
| 303 |
y=float(y) / height, # Normalize to 0-1
|
|
|
|
| 305 |
name=self.KEYPOINT_NAMES[i] if i < len(self.KEYPOINT_NAMES) else f"joint_{i}"
|
| 306 |
))
|
| 307 |
|
| 308 |
+
if keypoints:
|
| 309 |
+
pose_results.append(PoseResult(
|
| 310 |
+
keypoints=keypoints,
|
| 311 |
+
frame_index=0,
|
| 312 |
+
person_id=person_idx
|
| 313 |
+
))
|
| 314 |
|
| 315 |
return pose_results
|
| 316 |
|
|
|
|
| 387 |
|
| 388 |
# Legacy format support
|
| 389 |
else:
|
| 390 |
+
return create_pose_estimator(model_spec)
|
| 391 |
+
|
| 392 |
+
|
| 393 |
+
def _safe_pose_from_dets(dets: List[PoseResult], frame_idx: int) -> List[PoseResult]:
|
| 394 |
+
"""
|
| 395 |
+
Helper function to fill missing poses with the previous pose and keep a missing_mask.
|
| 396 |
+
After the loop, interpolate missing poses in pose_seq before running metrics.
|
| 397 |
+
Add debug prints when a pose is missing and when interpolation is performed.
|
| 398 |
+
"""
|
| 399 |
+
safe_poses = []
|
| 400 |
+
missing_mask = []
|
| 401 |
+
prev_pose = None
|
| 402 |
+
|
| 403 |
+
for det in dets:
|
| 404 |
+
if det.frame_index == frame_idx:
|
| 405 |
+
if prev_pose is None:
|
| 406 |
+
print(f"Warning: No previous pose found for frame {frame_idx}")
|
| 407 |
+
safe_poses.append(PoseResult(keypoints=[], frame_index=frame_idx))
|
| 408 |
+
missing_mask.append(True)
|
| 409 |
+
else:
|
| 410 |
+
safe_poses.append(PoseResult(keypoints=prev_pose.keypoints, frame_index=frame_idx))
|
| 411 |
+
missing_mask.append(False)
|
| 412 |
+
prev_pose = det
|
| 413 |
+
elif det.frame_index > frame_idx:
|
| 414 |
+
break
|
| 415 |
+
|
| 416 |
+
if prev_pose is None:
|
| 417 |
+
print(f"Warning: No poses found for frame {frame_idx}")
|
| 418 |
+
safe_poses.append(PoseResult(keypoints=[], frame_index=frame_idx))
|
| 419 |
+
missing_mask.append(True)
|
| 420 |
+
else:
|
| 421 |
+
safe_poses.append(PoseResult(keypoints=prev_pose.keypoints, frame_index=frame_idx))
|
| 422 |
+
missing_mask.append(False)
|
| 423 |
+
|
| 424 |
+
return safe_poses, missing_mask
|
backend/gradio_labanmovementanalysis/video_utils.py
CHANGED
|
@@ -52,6 +52,9 @@ def get_video_info(video_path: str) -> Tuple[int, float, Tuple[int, int]]:
|
|
| 52 |
fps = cap.get(cv2.CAP_PROP_FPS)
|
| 53 |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
| 54 |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
|
|
|
|
|
|
| 55 |
return frame_count, fps, (width, height)
|
| 56 |
finally:
|
| 57 |
cap.release()
|
|
|
|
| 52 |
fps = cap.get(cv2.CAP_PROP_FPS)
|
| 53 |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
| 54 |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
| 55 |
+
print("DEBUG: fps =", fps, "frames =", frame_count)
|
| 56 |
+
if np.isnan(fps) or fps <= 0:
|
| 57 |
+
raise ValueError("FPS came back NaN/0 – did the capture fail?")
|
| 58 |
return frame_count, fps, (width, height)
|
| 59 |
finally:
|
| 60 |
cap.release()
|
backend/gradio_labanmovementanalysis/visualizer.py
CHANGED
|
@@ -7,6 +7,8 @@ import numpy as np
|
|
| 7 |
from typing import List, Tuple, Optional, Dict, Any
|
| 8 |
from collections import deque
|
| 9 |
import colorsys
|
|
|
|
|
|
|
| 10 |
|
| 11 |
from .pose_estimation import PoseResult, Keypoint
|
| 12 |
from .notation_engine import MovementMetrics, Direction, Intensity, Speed
|
|
@@ -66,6 +68,7 @@ class PoseVisualizer:
|
|
| 66 |
show_direction_arrows: Whether to show movement direction arrows
|
| 67 |
show_metrics: Whether to display text metrics on frame
|
| 68 |
"""
|
|
|
|
| 69 |
self.trail_length = trail_length
|
| 70 |
self.show_skeleton = show_skeleton
|
| 71 |
self.show_trails = show_trails
|
|
@@ -99,11 +102,18 @@ class PoseVisualizer:
|
|
| 99 |
Returns:
|
| 100 |
Annotated frame
|
| 101 |
"""
|
| 102 |
-
|
| 103 |
vis_frame = frame.copy()
|
|
|
|
|
|
|
|
|
|
| 104 |
|
| 105 |
# Draw for each detected person
|
| 106 |
for person_idx, pose in enumerate(pose_results):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 107 |
# Update trails
|
| 108 |
if self.show_trails:
|
| 109 |
self._update_trails(pose, person_idx)
|
|
@@ -146,26 +156,30 @@ class PoseVisualizer:
|
|
| 146 |
Returns:
|
| 147 |
Path to created video
|
| 148 |
"""
|
| 149 |
-
|
| 150 |
-
|
| 151 |
-
|
| 152 |
-
|
| 153 |
-
|
| 154 |
-
|
| 155 |
-
|
| 156 |
-
|
| 157 |
-
|
| 158 |
-
|
| 159 |
-
|
| 160 |
-
|
| 161 |
-
annotated_frames
|
| 162 |
-
|
| 163 |
-
|
| 164 |
-
|
| 165 |
-
|
|
|
|
|
|
|
|
|
|
| 166 |
|
| 167 |
def _update_trails(self, pose: PoseResult, person_id: int):
|
| 168 |
"""Update motion trails for a person."""
|
|
|
|
| 169 |
if person_id not in self.trails:
|
| 170 |
self.trails[person_id] = {}
|
| 171 |
|
|
@@ -182,6 +196,7 @@ class PoseVisualizer:
|
|
| 182 |
|
| 183 |
def _draw_trails(self, frame: np.ndarray, person_id: int):
|
| 184 |
"""Draw motion trails for a person."""
|
|
|
|
| 185 |
if person_id not in self.trails:
|
| 186 |
return
|
| 187 |
|
|
@@ -193,19 +208,25 @@ class PoseVisualizer:
|
|
| 193 |
|
| 194 |
# Draw trail with fading effect
|
| 195 |
for i in range(1, len(trail)):
|
| 196 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 197 |
alpha = i / len(trail)
|
| 198 |
color = tuple(int(c * alpha) for c in (255, 255, 255))
|
| 199 |
|
| 200 |
# Convert normalized to pixel coordinates
|
| 201 |
-
pt1 = (int(
|
| 202 |
-
pt2 = (int(
|
| 203 |
|
| 204 |
# Draw trail segment
|
| 205 |
cv2.line(frame, pt1, pt2, color, thickness=max(1, int(3 * alpha)))
|
| 206 |
|
| 207 |
def _draw_skeleton(self, frame: np.ndarray, pose: PoseResult, color: Tuple[int, int, int]):
|
| 208 |
"""Draw pose skeleton."""
|
|
|
|
| 209 |
h, w = frame.shape[:2]
|
| 210 |
|
| 211 |
# Create keypoint lookup
|
|
@@ -229,6 +250,11 @@ class PoseVisualizer:
|
|
| 229 |
kp1 = kp_dict[name1]
|
| 230 |
kp2 = kp_dict[name2]
|
| 231 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 232 |
# Convert to pixel coordinates
|
| 233 |
pt1 = (int(kp1.x * w), int(kp1.y * h))
|
| 234 |
pt2 = (int(kp2.x * w), int(kp2.y * h))
|
|
@@ -239,12 +265,18 @@ class PoseVisualizer:
|
|
| 239 |
def _draw_keypoints(self, frame: np.ndarray, pose: PoseResult,
|
| 240 |
metrics: Optional[MovementMetrics] = None):
|
| 241 |
"""Draw individual keypoints."""
|
|
|
|
| 242 |
h, w = frame.shape[:2]
|
| 243 |
|
| 244 |
for kp in pose.keypoints:
|
| 245 |
if kp.confidence < 0.3:
|
| 246 |
continue
|
| 247 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 248 |
# Convert to pixel coordinates
|
| 249 |
pt = (int(kp.x * w), int(kp.y * h))
|
| 250 |
|
|
@@ -258,16 +290,24 @@ class PoseVisualizer:
|
|
| 258 |
def _draw_direction_arrow(self, frame: np.ndarray, pose: PoseResult,
|
| 259 |
metrics: MovementMetrics):
|
| 260 |
"""Draw arrow indicating movement direction."""
|
|
|
|
| 261 |
if metrics.direction == Direction.STATIONARY:
|
| 262 |
return
|
| 263 |
|
| 264 |
h, w = frame.shape[:2]
|
| 265 |
|
| 266 |
# Get body center
|
| 267 |
-
|
| 268 |
-
|
| 269 |
-
|
| 270 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 271 |
center = (int(center_x * w), int(center_y * h))
|
| 272 |
|
| 273 |
# Calculate arrow endpoint based on direction
|
|
@@ -294,18 +334,29 @@ class PoseVisualizer:
|
|
| 294 |
|
| 295 |
def _draw_metrics_overlay(self, frame: np.ndarray, metrics: MovementMetrics):
|
| 296 |
"""Draw text overlay with movement metrics."""
|
|
|
|
| 297 |
# Define text properties
|
| 298 |
font = cv2.FONT_HERSHEY_SIMPLEX
|
| 299 |
font_scale = 0.6
|
| 300 |
thickness = 2
|
| 301 |
|
| 302 |
-
#
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 303 |
lines = [
|
| 304 |
f"Direction: {metrics.direction.value}",
|
| 305 |
-
f"Speed: {metrics.speed.value} ({
|
| 306 |
f"Intensity: {metrics.intensity.value}",
|
| 307 |
-
f"Fluidity: {
|
| 308 |
-
f"Expansion: {
|
| 309 |
]
|
| 310 |
|
| 311 |
# Draw background rectangle
|
|
@@ -330,6 +381,7 @@ class PoseVisualizer:
|
|
| 330 |
|
| 331 |
def _get_color_for_metrics(self, metrics: Optional[MovementMetrics]) -> Tuple[int, int, int]:
|
| 332 |
"""Get color based on movement metrics."""
|
|
|
|
| 333 |
if metrics is None:
|
| 334 |
return (255, 255, 255) # White default
|
| 335 |
|
|
@@ -337,6 +389,7 @@ class PoseVisualizer:
|
|
| 337 |
|
| 338 |
def _confidence_to_color(self, confidence: float) -> Tuple[int, int, int]:
|
| 339 |
"""Convert confidence score to color (green=high, red=low)."""
|
|
|
|
| 340 |
# Use HSV color space for smooth gradient
|
| 341 |
hue = confidence * 120 # 0=red, 120=green
|
| 342 |
rgb = colorsys.hsv_to_rgb(hue / 360, 1.0, 1.0)
|
|
@@ -344,6 +397,7 @@ class PoseVisualizer:
|
|
| 344 |
|
| 345 |
def _get_skeleton_for_model(self, keypoints: List[Keypoint]) -> List[Tuple[int, int]]:
|
| 346 |
"""Determine which skeleton definition to use based on keypoints."""
|
|
|
|
| 347 |
# Simple heuristic: if we have more than 20 keypoints, use MediaPipe skeleton
|
| 348 |
if len(keypoints) > 20:
|
| 349 |
return self.MEDIAPIPE_SKELETON
|
|
@@ -351,6 +405,7 @@ class PoseVisualizer:
|
|
| 351 |
|
| 352 |
def _get_keypoint_names_for_model(self, keypoints: List[Keypoint]) -> List[str]:
|
| 353 |
"""Get ordered list of keypoint names for the model."""
|
|
|
|
| 354 |
# If keypoints have names, use them
|
| 355 |
if keypoints and keypoints[0].name:
|
| 356 |
return [kp.name for kp in keypoints]
|
|
|
|
| 7 |
from typing import List, Tuple, Optional, Dict, Any
|
| 8 |
from collections import deque
|
| 9 |
import colorsys
|
| 10 |
+
import math
|
| 11 |
+
import traceback
|
| 12 |
|
| 13 |
from .pose_estimation import PoseResult, Keypoint
|
| 14 |
from .notation_engine import MovementMetrics, Direction, Intensity, Speed
|
|
|
|
| 68 |
show_direction_arrows: Whether to show movement direction arrows
|
| 69 |
show_metrics: Whether to display text metrics on frame
|
| 70 |
"""
|
| 71 |
+
print("[TRACE] PoseVisualizer.__init__ called")
|
| 72 |
self.trail_length = trail_length
|
| 73 |
self.show_skeleton = show_skeleton
|
| 74 |
self.show_trails = show_trails
|
|
|
|
| 102 |
Returns:
|
| 103 |
Annotated frame
|
| 104 |
"""
|
| 105 |
+
print(f"[TRACE] visualize_frame(frame_index={frame_index}, num_poses={len(pose_results)})")
|
| 106 |
vis_frame = frame.copy()
|
| 107 |
+
if not pose_results:
|
| 108 |
+
print(f"[DEBUG] Frame {frame_index}: No pose results (no detections)")
|
| 109 |
+
return vis_frame # No people detected, return original frame
|
| 110 |
|
| 111 |
# Draw for each detected person
|
| 112 |
for person_idx, pose in enumerate(pose_results):
|
| 113 |
+
if not pose.keypoints or all(math.isnan(kp.x) or math.isnan(kp.y) for kp in pose.keypoints):
|
| 114 |
+
print(f"[DEBUG] Frame {frame_index}, Person {person_idx}: All keypoints are NaN or missing, skipping drawing for this person.")
|
| 115 |
+
continue
|
| 116 |
+
|
| 117 |
# Update trails
|
| 118 |
if self.show_trails:
|
| 119 |
self._update_trails(pose, person_idx)
|
|
|
|
| 156 |
Returns:
|
| 157 |
Path to created video
|
| 158 |
"""
|
| 159 |
+
print(f"[TRACE] generate_overlay_video(num_frames={len(frames)})")
|
| 160 |
+
try:
|
| 161 |
+
if len(frames) != len(all_pose_results) or len(frames) != len(all_movement_metrics):
|
| 162 |
+
raise ValueError("Mismatched lengths between frames, poses, and metrics")
|
| 163 |
+
self.trails = {}
|
| 164 |
+
annotated_frames = []
|
| 165 |
+
for i, (frame, poses, metrics) in enumerate(
|
| 166 |
+
zip(frames, all_pose_results, all_movement_metrics)
|
| 167 |
+
):
|
| 168 |
+
annotated_frame = self.visualize_frame(frame, poses, metrics, i)
|
| 169 |
+
annotated_frames.append(annotated_frame)
|
| 170 |
+
print(f"[DEBUG] About to assemble video with {len(annotated_frames)} frames.")
|
| 171 |
+
for idx, f in enumerate(annotated_frames):
|
| 172 |
+
print(f"[DEBUG] Frame {idx} shape: {f.shape if hasattr(f, 'shape') else type(f)}")
|
| 173 |
+
from . import video_utils
|
| 174 |
+
return video_utils.assemble_video(annotated_frames, output_path, fps)
|
| 175 |
+
except Exception as e:
|
| 176 |
+
print(f'[FATAL ERROR] Exception during visualization: {e}')
|
| 177 |
+
print(traceback.format_exc())
|
| 178 |
+
raise
|
| 179 |
|
| 180 |
def _update_trails(self, pose: PoseResult, person_id: int):
|
| 181 |
"""Update motion trails for a person."""
|
| 182 |
+
print(f"[TRACE] _update_trails(person_id={person_id})")
|
| 183 |
if person_id not in self.trails:
|
| 184 |
self.trails[person_id] = {}
|
| 185 |
|
|
|
|
| 196 |
|
| 197 |
def _draw_trails(self, frame: np.ndarray, person_id: int):
|
| 198 |
"""Draw motion trails for a person."""
|
| 199 |
+
print(f"[TRACE] _draw_trails(person_id={person_id})")
|
| 200 |
if person_id not in self.trails:
|
| 201 |
return
|
| 202 |
|
|
|
|
| 208 |
|
| 209 |
# Draw trail with fading effect
|
| 210 |
for i in range(1, len(trail)):
|
| 211 |
+
x1, y1 = trail[i-1]
|
| 212 |
+
x2, y2 = trail[i]
|
| 213 |
+
if math.isnan(x1) or math.isnan(y1) or math.isnan(x2) or math.isnan(y2):
|
| 214 |
+
print(f"[DEBUG] _draw_trails: NaN detected in trail for person {person_id}, joint {joint_name}, skipping segment.")
|
| 215 |
+
continue
|
| 216 |
+
print(f"[DEBUG] _draw_trails: Converting to int: ({x1}*{w}, {y1}*{h}) and ({x2}*{w}, {y2}*{h})")
|
| 217 |
alpha = i / len(trail)
|
| 218 |
color = tuple(int(c * alpha) for c in (255, 255, 255))
|
| 219 |
|
| 220 |
# Convert normalized to pixel coordinates
|
| 221 |
+
pt1 = (int(x1 * w), int(y1 * h))
|
| 222 |
+
pt2 = (int(x2 * w), int(y2 * h))
|
| 223 |
|
| 224 |
# Draw trail segment
|
| 225 |
cv2.line(frame, pt1, pt2, color, thickness=max(1, int(3 * alpha)))
|
| 226 |
|
| 227 |
def _draw_skeleton(self, frame: np.ndarray, pose: PoseResult, color: Tuple[int, int, int]):
|
| 228 |
"""Draw pose skeleton."""
|
| 229 |
+
print(f"[TRACE] _draw_skeleton()")
|
| 230 |
h, w = frame.shape[:2]
|
| 231 |
|
| 232 |
# Create keypoint lookup
|
|
|
|
| 250 |
kp1 = kp_dict[name1]
|
| 251 |
kp2 = kp_dict[name2]
|
| 252 |
|
| 253 |
+
if math.isnan(kp1.x) or math.isnan(kp1.y) or math.isnan(kp2.x) or math.isnan(kp2.y):
|
| 254 |
+
print(f"[DEBUG] _draw_skeleton: NaN detected in skeleton for keypoints {name1}, {name2}, skipping line.")
|
| 255 |
+
continue
|
| 256 |
+
|
| 257 |
+
print(f"[DEBUG] _draw_skeleton: Converting to int: ({kp1.x}*{w}, {kp1.y}*{h}) and ({kp2.x}*{w}, {kp2.y}*{h})")
|
| 258 |
# Convert to pixel coordinates
|
| 259 |
pt1 = (int(kp1.x * w), int(kp1.y * h))
|
| 260 |
pt2 = (int(kp2.x * w), int(kp2.y * h))
|
|
|
|
| 265 |
def _draw_keypoints(self, frame: np.ndarray, pose: PoseResult,
|
| 266 |
metrics: Optional[MovementMetrics] = None):
|
| 267 |
"""Draw individual keypoints."""
|
| 268 |
+
print(f"[TRACE] _draw_keypoints()")
|
| 269 |
h, w = frame.shape[:2]
|
| 270 |
|
| 271 |
for kp in pose.keypoints:
|
| 272 |
if kp.confidence < 0.3:
|
| 273 |
continue
|
| 274 |
|
| 275 |
+
if math.isnan(kp.x) or math.isnan(kp.y):
|
| 276 |
+
print(f"[DEBUG] _draw_keypoints: NaN detected in keypoint {kp.name}, skipping.")
|
| 277 |
+
continue
|
| 278 |
+
|
| 279 |
+
print(f"[DEBUG] _draw_keypoints: Converting to int: ({kp.x}*{w}, {kp.y}*{h})")
|
| 280 |
# Convert to pixel coordinates
|
| 281 |
pt = (int(kp.x * w), int(kp.y * h))
|
| 282 |
|
|
|
|
| 290 |
def _draw_direction_arrow(self, frame: np.ndarray, pose: PoseResult,
|
| 291 |
metrics: MovementMetrics):
|
| 292 |
"""Draw arrow indicating movement direction."""
|
| 293 |
+
print(f"[TRACE] _draw_direction_arrow()")
|
| 294 |
if metrics.direction == Direction.STATIONARY:
|
| 295 |
return
|
| 296 |
|
| 297 |
h, w = frame.shape[:2]
|
| 298 |
|
| 299 |
# Get body center
|
| 300 |
+
xs = [kp.x for kp in pose.keypoints if kp.confidence > 0.3 and not math.isnan(kp.x)]
|
| 301 |
+
ys = [kp.y for kp in pose.keypoints if kp.confidence > 0.3 and not math.isnan(kp.y)]
|
| 302 |
+
if not xs or not ys:
|
| 303 |
+
print(f"[DEBUG] _draw_direction_arrow: No valid keypoints for direction arrow, skipping.")
|
| 304 |
+
return
|
| 305 |
+
print(f"[DEBUG] _draw_direction_arrow: Converting to int: ({np.mean(xs)}*{w}, {np.mean(ys)}*{h})")
|
| 306 |
+
center_x = np.mean(xs)
|
| 307 |
+
center_y = np.mean(ys)
|
| 308 |
+
if math.isnan(center_x) or math.isnan(center_y):
|
| 309 |
+
print(f"[DEBUG] _draw_direction_arrow: NaN detected in center, skipping arrow.")
|
| 310 |
+
return
|
| 311 |
center = (int(center_x * w), int(center_y * h))
|
| 312 |
|
| 313 |
# Calculate arrow endpoint based on direction
|
|
|
|
| 334 |
|
| 335 |
def _draw_metrics_overlay(self, frame: np.ndarray, metrics: MovementMetrics):
|
| 336 |
"""Draw text overlay with movement metrics."""
|
| 337 |
+
print(f"[TRACE] _draw_metrics_overlay()")
|
| 338 |
# Define text properties
|
| 339 |
font = cv2.FONT_HERSHEY_SIMPLEX
|
| 340 |
font_scale = 0.6
|
| 341 |
thickness = 2
|
| 342 |
|
| 343 |
+
# Sanitize metrics
|
| 344 |
+
def safe_metric(val, name):
|
| 345 |
+
if isinstance(val, float) and math.isnan(val):
|
| 346 |
+
print(f"[DEBUG] _draw_metrics_overlay: {name} is NaN, replacing with 0.0")
|
| 347 |
+
return 0.0
|
| 348 |
+
return val
|
| 349 |
+
|
| 350 |
+
velocity = safe_metric(metrics.velocity, "velocity")
|
| 351 |
+
fluidity = safe_metric(metrics.fluidity, "fluidity")
|
| 352 |
+
expansion = safe_metric(metrics.expansion, "expansion")
|
| 353 |
+
|
| 354 |
lines = [
|
| 355 |
f"Direction: {metrics.direction.value}",
|
| 356 |
+
f"Speed: {metrics.speed.value} ({velocity:.2f})",
|
| 357 |
f"Intensity: {metrics.intensity.value}",
|
| 358 |
+
f"Fluidity: {fluidity:.2f}",
|
| 359 |
+
f"Expansion: {expansion:.2f}"
|
| 360 |
]
|
| 361 |
|
| 362 |
# Draw background rectangle
|
|
|
|
| 381 |
|
| 382 |
def _get_color_for_metrics(self, metrics: Optional[MovementMetrics]) -> Tuple[int, int, int]:
|
| 383 |
"""Get color based on movement metrics."""
|
| 384 |
+
print(f"[TRACE] _get_color_for_metrics()")
|
| 385 |
if metrics is None:
|
| 386 |
return (255, 255, 255) # White default
|
| 387 |
|
|
|
|
| 389 |
|
| 390 |
def _confidence_to_color(self, confidence: float) -> Tuple[int, int, int]:
|
| 391 |
"""Convert confidence score to color (green=high, red=low)."""
|
| 392 |
+
print(f"[TRACE] _confidence_to_color(confidence={confidence})")
|
| 393 |
# Use HSV color space for smooth gradient
|
| 394 |
hue = confidence * 120 # 0=red, 120=green
|
| 395 |
rgb = colorsys.hsv_to_rgb(hue / 360, 1.0, 1.0)
|
|
|
|
| 397 |
|
| 398 |
def _get_skeleton_for_model(self, keypoints: List[Keypoint]) -> List[Tuple[int, int]]:
|
| 399 |
"""Determine which skeleton definition to use based on keypoints."""
|
| 400 |
+
print(f"[TRACE] _get_skeleton_for_model(num_keypoints={len(keypoints)})")
|
| 401 |
# Simple heuristic: if we have more than 20 keypoints, use MediaPipe skeleton
|
| 402 |
if len(keypoints) > 20:
|
| 403 |
return self.MEDIAPIPE_SKELETON
|
|
|
|
| 405 |
|
| 406 |
def _get_keypoint_names_for_model(self, keypoints: List[Keypoint]) -> List[str]:
|
| 407 |
"""Get ordered list of keypoint names for the model."""
|
| 408 |
+
print(f"[TRACE] _get_keypoint_names_for_model(num_keypoints={len(keypoints)})")
|
| 409 |
# If keypoints have names, use them
|
| 410 |
if keypoints and keypoints[0].name:
|
| 411 |
return [kp.name for kp in keypoints]
|
demo/app.py
CHANGED
|
@@ -6,7 +6,8 @@ Author: Csaba (BladeSzaSza)
|
|
| 6 |
|
| 7 |
import gradio as gr
|
| 8 |
import os
|
| 9 |
-
from backend.gradio_labanmovementanalysis import LabanMovementAnalysis
|
|
|
|
| 10 |
|
| 11 |
# Import agent API if available
|
| 12 |
# Initialize agent API if available
|
|
@@ -144,6 +145,7 @@ def create_demo() -> gr.Blocks:
|
|
| 144 |
label="Examples"
|
| 145 |
)
|
| 146 |
|
|
|
|
| 147 |
# Output column
|
| 148 |
with gr.Column(scale=2, min_width=320):
|
| 149 |
viz_out = gr.Video(label="Annotated Video", scale=1, height=400)
|
|
|
|
| 6 |
|
| 7 |
import gradio as gr
|
| 8 |
import os
|
| 9 |
+
# from backend.gradio_labanmovementanalysis import LabanMovementAnalysis
|
| 10 |
+
from gradio_labanmovementanalysis import LabanMovementAnalysis
|
| 11 |
|
| 12 |
# Import agent API if available
|
| 13 |
# Initialize agent API if available
|
|
|
|
| 145 |
label="Examples"
|
| 146 |
)
|
| 147 |
|
| 148 |
+
|
| 149 |
# Output column
|
| 150 |
with gr.Column(scale=2, min_width=320):
|
| 151 |
viz_out = gr.Video(label="Annotated Video", scale=1, height=400)
|
yolo11n-pose.pt
ADDED
|
@@ -0,0 +1,3 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
version https://git-lfs.github.com/spec/v1
|
| 2 |
+
oid sha256:869e83fcdffdc7371fa4e34cd8e51c838cc729571d1635e5141e3075e9319dc0
|
| 3 |
+
size 6255593
|
yolov8n-pose.pt
ADDED
|
@@ -0,0 +1,3 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
version https://git-lfs.github.com/spec/v1
|
| 2 |
+
oid sha256:c6fa93dd1ee4a2c18c900a45c1d864a1c6f7aba75d84f91648a30b7fb641d212
|
| 3 |
+
size 6832633
|