""" Laban Movement Analysis (LMA) inspired notation engine. Computes movement metrics like direction, intensity, and speed from pose keypoints. """ import numpy as np from typing import List, Dict, Optional, Tuple, Any from dataclasses import dataclass from enum import Enum from .pose_estimation import PoseResult, Keypoint class Direction(Enum): """Movement direction categories.""" UP = "up" DOWN = "down" LEFT = "left" RIGHT = "right" FORWARD = "forward" # Note: Not fully implemented with 2D keypoints BACKWARD = "backward" # Note: Not fully implemented with 2D keypoints STATIONARY = "stationary" class Intensity(Enum): """Movement intensity levels.""" LOW = "low" MEDIUM = "medium" HIGH = "high" class Speed(Enum): """Movement speed categories.""" SLOW = "slow" MODERATE = "moderate" FAST = "fast" @dataclass class MovementMetrics: """LMA-inspired movement metrics for a frame or segment.""" frame_index: int timestamp: Optional[float] = None # Primary metrics direction: Direction = Direction.STATIONARY intensity: Intensity = Intensity.LOW speed: Speed = Speed.SLOW # Numeric values velocity: float = 0.0 # pixels/second or normalized units acceleration: float = 0.0 # Additional qualities fluidity: float = 0.0 # 0-1, smoothness of movement expansion: float = 0.0 # 0-1, how spread out the pose is # Raw displacement data center_displacement: Optional[Tuple[float, float]] = None total_displacement: float = 0.0 class SimpleKalmanFilter: """Lightweight Kalman filter for position/velocity tracking.""" def __init__(self, process_noise: float = 0.01, measurement_noise: float = 0.1): self.process_noise = process_noise self.measurement_noise = measurement_noise self.is_initialized = False # State: [x, y, vx, vy] self.state = np.zeros(4) self.covariance = np.eye(4) * 0.1 # Transition matrix (constant velocity model) self.F = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]]) # Measurement matrix (observe position only) self.H = np.array([[1, 0, 0, 0], [0, 1, 0, 0]]) # Process noise matrix self.Q = np.eye(4) * process_noise # Measurement noise matrix self.R = np.eye(2) * measurement_noise def predict(self, dt: float = 1.0): """Predict next state.""" # Update transition matrix with time step self.F[0, 2] = dt self.F[1, 3] = dt # Predict state self.state = self.F @ self.state self.covariance = self.F @ self.covariance @ self.F.T + self.Q def update(self, measurement: Tuple[float, float]): """Update with measurement.""" z = np.array([measurement[0], measurement[1]]) if not self.is_initialized: self.state[:2] = z self.is_initialized = True return # Calculate Kalman gain S = self.H @ self.covariance @ self.H.T + self.R K = self.covariance @ self.H.T @ np.linalg.inv(S) # Update state y = z - self.H @ self.state self.state = self.state + K @ y self.covariance = (np.eye(4) - K @ self.H) @ self.covariance def get_position(self) -> Tuple[float, float]: """Get filtered position.""" return (self.state[0], self.state[1]) def get_velocity(self) -> Tuple[float, float]: """Get filtered velocity.""" return (self.state[2], self.state[3]) class MovementAnalyzer: """Analyzes pose sequences to extract LMA-style movement metrics.""" def __init__(self, fps: float = 30.0, velocity_threshold_slow: float = 0.01, velocity_threshold_fast: float = 0.1, intensity_accel_threshold: float = 0.05, use_kalman_filter: bool = True, use_adaptive_thresholds: bool = True): """ Initialize movement analyzer with advanced features. Args: fps: Frames per second of the video velocity_threshold_slow: Initial threshold for slow movement velocity_threshold_fast: Initial threshold for fast movement intensity_accel_threshold: Initial acceleration threshold for intensity use_kalman_filter: Whether to use Kalman filtering for tracking use_adaptive_thresholds: Whether to adapt thresholds based on video content """ self.fps = fps self.frame_duration = 1.0 / fps if fps > 0 else 0.0 self.velocity_threshold_slow = velocity_threshold_slow self.velocity_threshold_fast = velocity_threshold_fast self.intensity_accel_threshold = intensity_accel_threshold self.use_kalman_filter = use_kalman_filter self.use_adaptive_thresholds = use_adaptive_thresholds # Kalman filter for tracking self.kalman_filter = SimpleKalmanFilter() if use_kalman_filter else None # Adaptive threshold parameters self.adaptive_thresholds_computed = False def analyze_movement(self, pose_sequence: List[List[PoseResult]]) -> List[MovementMetrics]: """ Analyze a sequence of poses to compute movement metrics. Args: pose_sequence: List of pose results per frame. Each inner list can contain multiple PoseResult if multiple people are detected. Returns: List of movement metrics per frame (currently for the first detected person). """ if not pose_sequence: return [] metrics = [] # For multi-person, these would need to be dictionaries mapping person_id to values prev_centers = None # Store as {person_id: center_coords} for multi-person prev_velocity = None # Store as {person_id: velocity_value} for multi-person for frame_idx, frame_poses in enumerate(pose_sequence): if not frame_poses: # No pose detected in this frame metrics.append(MovementMetrics( frame_index=frame_idx, timestamp=frame_idx * self.frame_duration if self.frame_duration else None )) continue # --- CURRENT: Analyze first person only --- # TODO: Extend to multi-person analysis. This would involve iterating # through frame_poses and tracking metrics for each person_id. pose = frame_poses[0] # --- END CURRENT --- if not pose.keypoints: # Ensure pose object has keypoints metrics.append(MovementMetrics( frame_index=frame_idx, timestamp=frame_idx * self.frame_duration if self.frame_duration else None )) # Reset for next frame if this person was being tracked prev_centers = None # Or prev_centers.pop(person_id, None) prev_velocity = None # Or prev_velocity.pop(person_id, None) continue # Compute confidence-weighted body center center = self._compute_body_center_weighted(pose.keypoints) # Apply Kalman filtering if enabled if self.kalman_filter: self.kalman_filter.predict(self.frame_duration) self.kalman_filter.update(center) center = self.kalman_filter.get_position() # Initialize metrics for this frame frame_metrics = MovementMetrics( frame_index=frame_idx, timestamp=frame_idx * self.frame_duration if self.frame_duration else None ) # Displacement, velocity, etc. can only be computed if there's a previous frame's center if prev_centers is not None and frame_idx > 0 and self.fps > 0: displacement = ( center[0] - prev_centers[0], center[1] - prev_centers[1] ) frame_metrics.center_displacement = displacement frame_metrics.total_displacement = np.sqrt( displacement[0]**2 + displacement[1]**2 ) frame_metrics.velocity = frame_metrics.total_displacement * self.fps frame_metrics.direction = self._compute_direction(displacement) frame_metrics.speed = self._categorize_speed(frame_metrics.velocity) if prev_velocity is not None: # Acceleration (change in velocity per second) delta_velocity = frame_metrics.velocity - prev_velocity frame_metrics.acceleration = delta_velocity * self.fps # (units/s)/s = units/s^2 frame_metrics.intensity = self._compute_intensity( frame_metrics.acceleration, frame_metrics.velocity ) frame_metrics.fluidity = self._compute_fluidity( frame_metrics.acceleration ) frame_metrics.expansion = self._compute_expansion(pose.keypoints) metrics.append(frame_metrics) # Update for next iteration (for the currently tracked person) prev_centers = center prev_velocity = frame_metrics.velocity # Apply adaptive thresholds if enabled if self.use_adaptive_thresholds and not self.adaptive_thresholds_computed: self._compute_adaptive_thresholds(metrics) # Recompute speed and intensity with new thresholds metrics = self._recompute_with_adaptive_thresholds(metrics) metrics = self._smooth_metrics(metrics) return metrics def _compute_body_center_weighted(self, keypoints: List[Keypoint]) -> Tuple[float, float]: """Compute confidence-weighted center of mass of the body.""" major_joints = ["left_hip", "right_hip", "left_shoulder", "right_shoulder"] weighted_x = 0.0 weighted_y = 0.0 total_weight = 0.0 for kp in keypoints: if kp.name in major_joints and kp.confidence > 0.5: weight = kp.confidence weighted_x += kp.x * weight weighted_y += kp.y * weight total_weight += weight # Fallback to all keypoints if no major joints found if total_weight == 0: for kp in keypoints: if kp.confidence > 0.3: weight = kp.confidence weighted_x += kp.x * weight weighted_y += kp.y * weight total_weight += weight if total_weight > 0: return (weighted_x / total_weight, weighted_y / total_weight) return (0.5, 0.5) def _compute_body_center(self, keypoints: List[Keypoint]) -> Tuple[float, float]: """Compute the center of mass of the body.""" major_joints = ["left_hip", "right_hip", "left_shoulder", "right_shoulder"] x_coords = [] y_coords = [] for kp in keypoints: if kp.confidence > 0.5 and kp.name in major_joints: # Ensure kp.name is not None x_coords.append(kp.x) y_coords.append(kp.y) if not x_coords or not y_coords: # If no major joints, try all keypoints x_coords = [kp.x for kp in keypoints if kp.confidence > 0.3] y_coords = [kp.y for kp in keypoints if kp.confidence > 0.3] if x_coords and y_coords: # Check if lists are non-empty return (np.mean(x_coords), np.mean(y_coords)) return (0.5, 0.5) # Default center if no reliable keypoints def _get_limb_positions(self, keypoints: List[Keypoint]) -> Dict[str, Tuple[float, float]]: """Get positions of major limbs. (Currently not heavily used beyond potential debugging)""" positions = {} for kp in keypoints: if kp.confidence > 0.3 and kp.name: positions[kp.name] = (kp.x, kp.y) return positions def _compute_direction(self, displacement: Tuple[float, float]) -> Direction: """Compute movement direction from displacement vector.""" dx, dy = displacement threshold = 0.005 # Normalized units per frame if abs(dx) < threshold and abs(dy) < threshold: return Direction.STATIONARY if abs(dx) > abs(dy): return Direction.RIGHT if dx > 0 else Direction.LEFT else: return Direction.DOWN if dy > 0 else Direction.UP # dy positive is typically down in image coords def _categorize_speed(self, velocity: float) -> Speed: """Categorize velocity into speed levels (velocity is in normalized units/sec).""" if velocity < self.velocity_threshold_slow: return Speed.SLOW elif velocity < self.velocity_threshold_fast: return Speed.MODERATE # Corrected from Speed.FAST else: return Speed.FAST def _compute_intensity(self, acceleration: float, velocity: float) -> Intensity: """Compute movement intensity (accel in norm_units/sec^2, vel in norm_units/sec).""" # Thresholds are relative to normalized space and per-second metrics if acceleration > self.intensity_accel_threshold * 2 or velocity > self.velocity_threshold_fast: return Intensity.HIGH elif acceleration > self.intensity_accel_threshold or velocity > self.velocity_threshold_slow: return Intensity.MEDIUM else: return Intensity.LOW def _compute_fluidity(self, acceleration: float) -> float: """ Compute fluidity score (0-1) based on acceleration (norm_units/sec^2). Lower acceleration = higher fluidity. """ max_expected_accel = 0.2 # This is an assumption for normalization, might need tuning. # Represents a fairly high acceleration in normalized units/sec^2. norm_accel = min(abs(acceleration) / max_expected_accel, 1.0) if max_expected_accel > 0 else 0.0 return 1.0 - norm_accel def _compute_expansion(self, keypoints: List[Keypoint]) -> float: """ Compute how expanded/contracted the pose is. Returns 0-1 where 1 is fully expanded. """ limb_pairs = [ ("left_wrist", "right_wrist"), ("left_ankle", "right_ankle"), ("left_wrist", "left_ankle"), ("right_wrist", "right_ankle"), # Could add torso diagonals like ("left_shoulder", "right_hip") ] kp_dict = {kp.name: kp for kp in keypoints if kp.confidence > 0.3 and kp.name} if not kp_dict: return 0.5 # No reliable keypoints distances = [] for name1, name2 in limb_pairs: if name1 in kp_dict and name2 in kp_dict: kp1 = kp_dict[name1] kp2 = kp_dict[name2] # Ensure coordinates are not NaN before calculation if not (np.isnan(kp1.x) or np.isnan(kp1.y) or np.isnan(kp2.x) or np.isnan(kp2.y)): dist = np.sqrt((kp1.x - kp2.x)**2 + (kp1.y - kp2.y)**2) distances.append(dist) if distances: avg_dist = np.mean(distances) # Max expected distance (e.g., diagonal of normalized 1x1 space is sqrt(2) approx 1.414) # This assumes keypoints are normalized. max_possible_dist_heuristic = 1.0 # A more conservative heuristic than 1.4, as limbs rarely span the full diagonal. return min(avg_dist / max_possible_dist_heuristic, 1.0) if max_possible_dist_heuristic > 0 else 0.0 return 0.5 # Default neutral expansion if no valid pairs def _compute_adaptive_thresholds(self, metrics: List[MovementMetrics]): """Compute adaptive thresholds based on movement characteristics in the video.""" velocities = [m.velocity for m in metrics if m.velocity > 0] accelerations = [abs(m.acceleration) for m in metrics if m.acceleration != 0] if velocities: velocity_percentiles = np.percentile(velocities, [25, 75]) self.velocity_threshold_slow = max(velocity_percentiles[0], 0.005) self.velocity_threshold_fast = velocity_percentiles[1] if accelerations: accel_75th = np.percentile(accelerations, 75) self.intensity_accel_threshold = max(accel_75th * 0.5, 0.01) self.adaptive_thresholds_computed = True def _recompute_with_adaptive_thresholds(self, metrics: List[MovementMetrics]) -> List[MovementMetrics]: """Recompute speed and intensity classifications with adaptive thresholds.""" for metric in metrics: if metric.velocity > 0: metric.speed = self._categorize_speed(metric.velocity) metric.intensity = self._compute_intensity(metric.acceleration, metric.velocity) return metrics def _smooth_metrics(self, metrics_list: List[MovementMetrics]) -> List[MovementMetrics]: """Apply smoothing to reduce noise in metrics using a simple moving average.""" window_size = 3 num_metrics = len(metrics_list) if num_metrics <= window_size: return metrics_list smoothed_metrics_list = metrics_list[:] # Work on a copy # Fields to smooth fields_to_smooth = ["velocity", "acceleration", "fluidity", "expansion"] for i in range(num_metrics): start_idx = max(0, i - window_size // 2) end_idx = min(num_metrics, i + window_size // 2 + 1) window = metrics_list[start_idx:end_idx] if not window: continue for field in fields_to_smooth: values = [getattr(m, field) for m in window if hasattr(m, field)] if values: setattr(smoothed_metrics_list[i], field, np.mean(values)) return smoothed_metrics_list def analyze_pose_sequence(pose_sequence: List[List[PoseResult]], fps: float = 30.0) -> List[MovementMetrics]: """ Convenience function to analyze a pose sequence. Args: pose_sequence: List of pose results per frame fps: Video frame rate Returns: List of movement metrics """ analyzer = MovementAnalyzer(fps=fps) return analyzer.analyze_movement(pose_sequence)