|
|
""" |
|
|
Laban Movement Analysis (LMA) inspired notation engine. |
|
|
Computes movement metrics like direction, intensity, and speed from pose keypoints. |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
from typing import List, Dict, Optional, Tuple, Any |
|
|
from dataclasses import dataclass |
|
|
from enum import Enum |
|
|
|
|
|
from .pose_estimation import PoseResult, Keypoint |
|
|
|
|
|
|
|
|
class Direction(Enum): |
|
|
"""Movement direction categories.""" |
|
|
UP = "up" |
|
|
DOWN = "down" |
|
|
LEFT = "left" |
|
|
RIGHT = "right" |
|
|
FORWARD = "forward" |
|
|
BACKWARD = "backward" |
|
|
STATIONARY = "stationary" |
|
|
|
|
|
|
|
|
class Intensity(Enum): |
|
|
"""Movement intensity levels.""" |
|
|
LOW = "low" |
|
|
MEDIUM = "medium" |
|
|
HIGH = "high" |
|
|
|
|
|
|
|
|
class Speed(Enum): |
|
|
"""Movement speed categories.""" |
|
|
SLOW = "slow" |
|
|
MODERATE = "moderate" |
|
|
FAST = "fast" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class MovementMetrics: |
|
|
"""LMA-inspired movement metrics for a frame or segment.""" |
|
|
frame_index: int |
|
|
timestamp: Optional[float] = None |
|
|
|
|
|
|
|
|
direction: Direction = Direction.STATIONARY |
|
|
intensity: Intensity = Intensity.LOW |
|
|
speed: Speed = Speed.SLOW |
|
|
|
|
|
|
|
|
velocity: float = 0.0 |
|
|
acceleration: float = 0.0 |
|
|
|
|
|
|
|
|
fluidity: float = 0.0 |
|
|
expansion: float = 0.0 |
|
|
|
|
|
|
|
|
center_displacement: Optional[Tuple[float, float]] = None |
|
|
total_displacement: float = 0.0 |
|
|
|
|
|
|
|
|
class SimpleKalmanFilter: |
|
|
"""Lightweight Kalman filter for position/velocity tracking.""" |
|
|
|
|
|
def __init__(self, process_noise: float = 0.01, measurement_noise: float = 0.1): |
|
|
self.process_noise = process_noise |
|
|
self.measurement_noise = measurement_noise |
|
|
self.is_initialized = False |
|
|
|
|
|
|
|
|
self.state = np.zeros(4) |
|
|
self.covariance = np.eye(4) * 0.1 |
|
|
|
|
|
|
|
|
self.F = np.array([[1, 0, 1, 0], |
|
|
[0, 1, 0, 1], |
|
|
[0, 0, 1, 0], |
|
|
[0, 0, 0, 1]]) |
|
|
|
|
|
|
|
|
self.H = np.array([[1, 0, 0, 0], |
|
|
[0, 1, 0, 0]]) |
|
|
|
|
|
|
|
|
self.Q = np.eye(4) * process_noise |
|
|
|
|
|
|
|
|
self.R = np.eye(2) * measurement_noise |
|
|
|
|
|
def predict(self, dt: float = 1.0): |
|
|
"""Predict next state.""" |
|
|
|
|
|
self.F[0, 2] = dt |
|
|
self.F[1, 3] = dt |
|
|
|
|
|
|
|
|
self.state = self.F @ self.state |
|
|
self.covariance = self.F @ self.covariance @ self.F.T + self.Q |
|
|
|
|
|
def update(self, measurement: Tuple[float, float]): |
|
|
"""Update with measurement.""" |
|
|
z = np.array([measurement[0], measurement[1]]) |
|
|
|
|
|
if not self.is_initialized: |
|
|
self.state[:2] = z |
|
|
self.is_initialized = True |
|
|
return |
|
|
|
|
|
|
|
|
S = self.H @ self.covariance @ self.H.T + self.R |
|
|
K = self.covariance @ self.H.T @ np.linalg.inv(S) |
|
|
|
|
|
|
|
|
y = z - self.H @ self.state |
|
|
self.state = self.state + K @ y |
|
|
self.covariance = (np.eye(4) - K @ self.H) @ self.covariance |
|
|
|
|
|
def get_position(self) -> Tuple[float, float]: |
|
|
"""Get filtered position.""" |
|
|
return (self.state[0], self.state[1]) |
|
|
|
|
|
def get_velocity(self) -> Tuple[float, float]: |
|
|
"""Get filtered velocity.""" |
|
|
return (self.state[2], self.state[3]) |
|
|
|
|
|
|
|
|
class MovementAnalyzer: |
|
|
"""Analyzes pose sequences to extract LMA-style movement metrics.""" |
|
|
|
|
|
def __init__(self, fps: float = 30.0, |
|
|
velocity_threshold_slow: float = 0.01, |
|
|
velocity_threshold_fast: float = 0.1, |
|
|
intensity_accel_threshold: float = 0.05, |
|
|
use_kalman_filter: bool = True, |
|
|
use_adaptive_thresholds: bool = True): |
|
|
""" |
|
|
Initialize movement analyzer with advanced features. |
|
|
|
|
|
Args: |
|
|
fps: Frames per second of the video |
|
|
velocity_threshold_slow: Initial threshold for slow movement |
|
|
velocity_threshold_fast: Initial threshold for fast movement |
|
|
intensity_accel_threshold: Initial acceleration threshold for intensity |
|
|
use_kalman_filter: Whether to use Kalman filtering for tracking |
|
|
use_adaptive_thresholds: Whether to adapt thresholds based on video content |
|
|
""" |
|
|
self.fps = fps |
|
|
self.frame_duration = 1.0 / fps if fps > 0 else 0.0 |
|
|
self.velocity_threshold_slow = velocity_threshold_slow |
|
|
self.velocity_threshold_fast = velocity_threshold_fast |
|
|
self.intensity_accel_threshold = intensity_accel_threshold |
|
|
self.use_kalman_filter = use_kalman_filter |
|
|
self.use_adaptive_thresholds = use_adaptive_thresholds |
|
|
|
|
|
|
|
|
self.kalman_filter = SimpleKalmanFilter() if use_kalman_filter else None |
|
|
|
|
|
|
|
|
self.adaptive_thresholds_computed = False |
|
|
|
|
|
def analyze_movement(self, pose_sequence: List[List[PoseResult]]) -> List[MovementMetrics]: |
|
|
""" |
|
|
Analyze a sequence of poses to compute movement metrics. |
|
|
|
|
|
Args: |
|
|
pose_sequence: List of pose results per frame. Each inner list can contain |
|
|
multiple PoseResult if multiple people are detected. |
|
|
|
|
|
Returns: |
|
|
List of movement metrics per frame (currently for the first detected person). |
|
|
""" |
|
|
if not pose_sequence: |
|
|
return [] |
|
|
|
|
|
metrics = [] |
|
|
|
|
|
prev_centers = None |
|
|
prev_velocity = None |
|
|
|
|
|
for frame_idx, frame_poses in enumerate(pose_sequence): |
|
|
if not frame_poses: |
|
|
|
|
|
metrics.append(MovementMetrics( |
|
|
frame_index=frame_idx, |
|
|
timestamp=frame_idx * self.frame_duration if self.frame_duration else None |
|
|
)) |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pose = frame_poses[0] |
|
|
|
|
|
|
|
|
if not pose.keypoints: |
|
|
metrics.append(MovementMetrics( |
|
|
frame_index=frame_idx, |
|
|
timestamp=frame_idx * self.frame_duration if self.frame_duration else None |
|
|
)) |
|
|
|
|
|
prev_centers = None |
|
|
prev_velocity = None |
|
|
continue |
|
|
|
|
|
|
|
|
center = self._compute_body_center_weighted(pose.keypoints) |
|
|
|
|
|
|
|
|
if self.kalman_filter: |
|
|
self.kalman_filter.predict(self.frame_duration) |
|
|
self.kalman_filter.update(center) |
|
|
center = self.kalman_filter.get_position() |
|
|
|
|
|
|
|
|
frame_metrics = MovementMetrics( |
|
|
frame_index=frame_idx, |
|
|
timestamp=frame_idx * self.frame_duration if self.frame_duration else None |
|
|
) |
|
|
|
|
|
|
|
|
if prev_centers is not None and frame_idx > 0 and self.fps > 0: |
|
|
displacement = ( |
|
|
center[0] - prev_centers[0], |
|
|
center[1] - prev_centers[1] |
|
|
) |
|
|
frame_metrics.center_displacement = displacement |
|
|
frame_metrics.total_displacement = np.sqrt( |
|
|
displacement[0]**2 + displacement[1]**2 |
|
|
) |
|
|
|
|
|
frame_metrics.velocity = frame_metrics.total_displacement * self.fps |
|
|
frame_metrics.direction = self._compute_direction(displacement) |
|
|
frame_metrics.speed = self._categorize_speed(frame_metrics.velocity) |
|
|
|
|
|
if prev_velocity is not None: |
|
|
|
|
|
delta_velocity = frame_metrics.velocity - prev_velocity |
|
|
frame_metrics.acceleration = delta_velocity * self.fps |
|
|
|
|
|
frame_metrics.intensity = self._compute_intensity( |
|
|
frame_metrics.acceleration, |
|
|
frame_metrics.velocity |
|
|
) |
|
|
frame_metrics.fluidity = self._compute_fluidity( |
|
|
frame_metrics.acceleration |
|
|
) |
|
|
|
|
|
frame_metrics.expansion = self._compute_expansion(pose.keypoints) |
|
|
metrics.append(frame_metrics) |
|
|
|
|
|
|
|
|
prev_centers = center |
|
|
prev_velocity = frame_metrics.velocity |
|
|
|
|
|
|
|
|
if self.use_adaptive_thresholds and not self.adaptive_thresholds_computed: |
|
|
self._compute_adaptive_thresholds(metrics) |
|
|
|
|
|
metrics = self._recompute_with_adaptive_thresholds(metrics) |
|
|
|
|
|
metrics = self._smooth_metrics(metrics) |
|
|
return metrics |
|
|
|
|
|
def _compute_body_center_weighted(self, keypoints: List[Keypoint]) -> Tuple[float, float]: |
|
|
"""Compute confidence-weighted center of mass of the body.""" |
|
|
major_joints = ["left_hip", "right_hip", "left_shoulder", "right_shoulder"] |
|
|
|
|
|
weighted_x = 0.0 |
|
|
weighted_y = 0.0 |
|
|
total_weight = 0.0 |
|
|
|
|
|
for kp in keypoints: |
|
|
if kp.name in major_joints and kp.confidence > 0.5: |
|
|
weight = kp.confidence |
|
|
weighted_x += kp.x * weight |
|
|
weighted_y += kp.y * weight |
|
|
total_weight += weight |
|
|
|
|
|
|
|
|
if total_weight == 0: |
|
|
for kp in keypoints: |
|
|
if kp.confidence > 0.3: |
|
|
weight = kp.confidence |
|
|
weighted_x += kp.x * weight |
|
|
weighted_y += kp.y * weight |
|
|
total_weight += weight |
|
|
|
|
|
if total_weight > 0: |
|
|
return (weighted_x / total_weight, weighted_y / total_weight) |
|
|
return (0.5, 0.5) |
|
|
|
|
|
def _compute_body_center(self, keypoints: List[Keypoint]) -> Tuple[float, float]: |
|
|
"""Compute the center of mass of the body.""" |
|
|
major_joints = ["left_hip", "right_hip", "left_shoulder", "right_shoulder"] |
|
|
|
|
|
x_coords = [] |
|
|
y_coords = [] |
|
|
|
|
|
for kp in keypoints: |
|
|
if kp.confidence > 0.5 and kp.name in major_joints: |
|
|
x_coords.append(kp.x) |
|
|
y_coords.append(kp.y) |
|
|
|
|
|
if not x_coords or not y_coords: |
|
|
x_coords = [kp.x for kp in keypoints if kp.confidence > 0.3] |
|
|
y_coords = [kp.y for kp in keypoints if kp.confidence > 0.3] |
|
|
|
|
|
if x_coords and y_coords: |
|
|
return (np.mean(x_coords), np.mean(y_coords)) |
|
|
return (0.5, 0.5) |
|
|
|
|
|
def _get_limb_positions(self, keypoints: List[Keypoint]) -> Dict[str, Tuple[float, float]]: |
|
|
"""Get positions of major limbs. (Currently not heavily used beyond potential debugging)""" |
|
|
positions = {} |
|
|
for kp in keypoints: |
|
|
if kp.confidence > 0.3 and kp.name: |
|
|
positions[kp.name] = (kp.x, kp.y) |
|
|
return positions |
|
|
|
|
|
def _compute_direction(self, displacement: Tuple[float, float]) -> Direction: |
|
|
"""Compute movement direction from displacement vector.""" |
|
|
dx, dy = displacement |
|
|
threshold = 0.005 |
|
|
|
|
|
if abs(dx) < threshold and abs(dy) < threshold: |
|
|
return Direction.STATIONARY |
|
|
|
|
|
if abs(dx) > abs(dy): |
|
|
return Direction.RIGHT if dx > 0 else Direction.LEFT |
|
|
else: |
|
|
return Direction.DOWN if dy > 0 else Direction.UP |
|
|
|
|
|
def _categorize_speed(self, velocity: float) -> Speed: |
|
|
"""Categorize velocity into speed levels (velocity is in normalized units/sec).""" |
|
|
if velocity < self.velocity_threshold_slow: |
|
|
return Speed.SLOW |
|
|
elif velocity < self.velocity_threshold_fast: |
|
|
return Speed.MODERATE |
|
|
else: |
|
|
return Speed.FAST |
|
|
|
|
|
def _compute_intensity(self, acceleration: float, velocity: float) -> Intensity: |
|
|
"""Compute movement intensity (accel in norm_units/sec^2, vel in norm_units/sec).""" |
|
|
|
|
|
if acceleration > self.intensity_accel_threshold * 2 or velocity > self.velocity_threshold_fast: |
|
|
return Intensity.HIGH |
|
|
elif acceleration > self.intensity_accel_threshold or velocity > self.velocity_threshold_slow: |
|
|
return Intensity.MEDIUM |
|
|
else: |
|
|
return Intensity.LOW |
|
|
|
|
|
def _compute_fluidity(self, acceleration: float) -> float: |
|
|
""" |
|
|
Compute fluidity score (0-1) based on acceleration (norm_units/sec^2). |
|
|
Lower acceleration = higher fluidity. |
|
|
""" |
|
|
max_expected_accel = 0.2 |
|
|
|
|
|
norm_accel = min(abs(acceleration) / max_expected_accel, 1.0) if max_expected_accel > 0 else 0.0 |
|
|
return 1.0 - norm_accel |
|
|
|
|
|
def _compute_expansion(self, keypoints: List[Keypoint]) -> float: |
|
|
""" |
|
|
Compute how expanded/contracted the pose is. |
|
|
Returns 0-1 where 1 is fully expanded. |
|
|
""" |
|
|
limb_pairs = [ |
|
|
("left_wrist", "right_wrist"), |
|
|
("left_ankle", "right_ankle"), |
|
|
("left_wrist", "left_ankle"), |
|
|
("right_wrist", "right_ankle"), |
|
|
|
|
|
] |
|
|
|
|
|
kp_dict = {kp.name: kp for kp in keypoints if kp.confidence > 0.3 and kp.name} |
|
|
if not kp_dict: return 0.5 |
|
|
|
|
|
distances = [] |
|
|
for name1, name2 in limb_pairs: |
|
|
if name1 in kp_dict and name2 in kp_dict: |
|
|
kp1 = kp_dict[name1] |
|
|
kp2 = kp_dict[name2] |
|
|
|
|
|
if not (np.isnan(kp1.x) or np.isnan(kp1.y) or np.isnan(kp2.x) or np.isnan(kp2.y)): |
|
|
dist = np.sqrt((kp1.x - kp2.x)**2 + (kp1.y - kp2.y)**2) |
|
|
distances.append(dist) |
|
|
|
|
|
if distances: |
|
|
avg_dist = np.mean(distances) |
|
|
|
|
|
|
|
|
max_possible_dist_heuristic = 1.0 |
|
|
return min(avg_dist / max_possible_dist_heuristic, 1.0) if max_possible_dist_heuristic > 0 else 0.0 |
|
|
|
|
|
return 0.5 |
|
|
|
|
|
def _compute_adaptive_thresholds(self, metrics: List[MovementMetrics]): |
|
|
"""Compute adaptive thresholds based on movement characteristics in the video.""" |
|
|
velocities = [m.velocity for m in metrics if m.velocity > 0] |
|
|
accelerations = [abs(m.acceleration) for m in metrics if m.acceleration != 0] |
|
|
|
|
|
if velocities: |
|
|
velocity_percentiles = np.percentile(velocities, [25, 75]) |
|
|
self.velocity_threshold_slow = max(velocity_percentiles[0], 0.005) |
|
|
self.velocity_threshold_fast = velocity_percentiles[1] |
|
|
|
|
|
if accelerations: |
|
|
accel_75th = np.percentile(accelerations, 75) |
|
|
self.intensity_accel_threshold = max(accel_75th * 0.5, 0.01) |
|
|
|
|
|
self.adaptive_thresholds_computed = True |
|
|
|
|
|
def _recompute_with_adaptive_thresholds(self, metrics: List[MovementMetrics]) -> List[MovementMetrics]: |
|
|
"""Recompute speed and intensity classifications with adaptive thresholds.""" |
|
|
for metric in metrics: |
|
|
if metric.velocity > 0: |
|
|
metric.speed = self._categorize_speed(metric.velocity) |
|
|
metric.intensity = self._compute_intensity(metric.acceleration, metric.velocity) |
|
|
return metrics |
|
|
|
|
|
def _smooth_metrics(self, metrics_list: List[MovementMetrics]) -> List[MovementMetrics]: |
|
|
"""Apply smoothing to reduce noise in metrics using a simple moving average.""" |
|
|
window_size = 3 |
|
|
num_metrics = len(metrics_list) |
|
|
|
|
|
if num_metrics <= window_size: |
|
|
return metrics_list |
|
|
|
|
|
smoothed_metrics_list = metrics_list[:] |
|
|
|
|
|
|
|
|
fields_to_smooth = ["velocity", "acceleration", "fluidity", "expansion"] |
|
|
|
|
|
for i in range(num_metrics): |
|
|
start_idx = max(0, i - window_size // 2) |
|
|
end_idx = min(num_metrics, i + window_size // 2 + 1) |
|
|
window = metrics_list[start_idx:end_idx] |
|
|
|
|
|
if not window: continue |
|
|
|
|
|
for field in fields_to_smooth: |
|
|
values = [getattr(m, field) for m in window if hasattr(m, field)] |
|
|
if values: |
|
|
setattr(smoothed_metrics_list[i], field, np.mean(values)) |
|
|
|
|
|
return smoothed_metrics_list |
|
|
|
|
|
|
|
|
def analyze_pose_sequence(pose_sequence: List[List[PoseResult]], |
|
|
fps: float = 30.0) -> List[MovementMetrics]: |
|
|
""" |
|
|
Convenience function to analyze a pose sequence. |
|
|
|
|
|
Args: |
|
|
pose_sequence: List of pose results per frame |
|
|
fps: Video frame rate |
|
|
|
|
|
Returns: |
|
|
List of movement metrics |
|
|
""" |
|
|
analyzer = MovementAnalyzer(fps=fps) |
|
|
return analyzer.analyze_movement(pose_sequence) |