Spaces:
Paused
Paused
| """ | |
| Movement Classifier - Advanced movement intelligence and analysis | |
| Classifies dance movements, calculates intensity, and detects patterns | |
| """ | |
| import numpy as np | |
| from typing import List, Dict, Tuple, Optional | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| import logging | |
| from app.config import Config | |
| from app.core.pose_analyzer import PoseKeypoints | |
| from app.utils.validation import safe_divide | |
| logger = logging.getLogger(__name__) | |
| class MovementType(Enum): | |
| """Enumeration of movement types""" | |
| STANDING = "Standing" | |
| WALKING = "Walking" | |
| DANCING = "Dancing" | |
| JUMPING = "Jumping" | |
| CROUCHING = "Crouching" | |
| UNKNOWN = "Unknown" | |
| class MovementMetrics: | |
| """Data class for movement analysis results""" | |
| movement_type: MovementType | |
| intensity: float # 0-100 scale | |
| velocity: float # Average velocity | |
| body_part_activity: Dict[str, float] # Activity level per body part | |
| frame_range: Tuple[int, int] # Start and end frame numbers | |
| class MovementClassifier: | |
| """ | |
| Analyzes pose sequences to classify movements and calculate metrics | |
| """ | |
| # Body part groupings using MediaPipe landmark indices | |
| BODY_PARTS = { | |
| "head": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], # Face and head | |
| "torso": [11, 12, 23, 24], # Shoulders and hips | |
| "left_arm": [11, 13, 15, 17, 19, 21], # Left shoulder to hand | |
| "right_arm": [12, 14, 16, 18, 20, 22], # Right shoulder to hand | |
| "left_leg": [23, 25, 27, 29, 31], # Left hip to foot | |
| "right_leg": [24, 26, 28, 30, 32] # Right hip to foot | |
| } | |
| def __init__(self, smoothing_window: int = 5): | |
| """ | |
| Initialize movement classifier | |
| Args: | |
| smoothing_window: Number of frames for smoothing calculations | |
| """ | |
| self.smoothing_window = smoothing_window | |
| self.movement_history: List[MovementMetrics] = [] | |
| logger.info("MovementClassifier initialized") | |
| def analyze_sequence(self, keypoints_sequence: List[PoseKeypoints]) -> MovementMetrics: | |
| """ | |
| Analyze a sequence of pose keypoints to classify movement | |
| Args: | |
| keypoints_sequence: List of detected pose keypoints | |
| Returns: | |
| MovementMetrics object with analysis results | |
| """ | |
| if not keypoints_sequence: | |
| return self._create_empty_metrics() | |
| # Calculate velocities between consecutive frames | |
| velocities = self._calculate_velocities(keypoints_sequence) | |
| # Calculate average velocity (overall movement speed) | |
| avg_velocity = np.mean(velocities) if len(velocities) > 0 else 0.0 | |
| # Classify movement type based on velocity and pose characteristics | |
| movement_type = self._classify_movement(keypoints_sequence, avg_velocity) | |
| # Calculate movement intensity (0-100 scale) | |
| intensity = self._calculate_intensity(velocities, movement_type) | |
| # Analyze activity per body part | |
| body_part_activity = self._calculate_body_part_activity(keypoints_sequence) | |
| # Get frame range | |
| frame_range = ( | |
| keypoints_sequence[0].frame_number, | |
| keypoints_sequence[-1].frame_number | |
| ) | |
| metrics = MovementMetrics( | |
| movement_type=movement_type, | |
| intensity=intensity, | |
| velocity=avg_velocity, | |
| body_part_activity=body_part_activity, | |
| frame_range=frame_range | |
| ) | |
| self.movement_history.append(metrics) | |
| logger.info(f"Analyzed sequence: {movement_type.value}, " | |
| f"Intensity: {intensity:.1f}, Velocity: {avg_velocity:.4f}") | |
| return metrics | |
| def _calculate_velocities(self, keypoints_sequence: List[PoseKeypoints]) -> np.ndarray: | |
| """ | |
| Calculate frame-to-frame velocities for all keypoints | |
| Args: | |
| keypoints_sequence: List of pose keypoints | |
| Returns: | |
| Array of velocities (one per frame transition) | |
| """ | |
| if len(keypoints_sequence) < 2: | |
| return np.array([0.0]) | |
| velocities = [] | |
| for i in range(1, len(keypoints_sequence)): | |
| prev_landmarks = keypoints_sequence[i-1].landmarks[:, :2] # x, y only | |
| curr_landmarks = keypoints_sequence[i].landmarks[:, :2] | |
| # Calculate Euclidean distance for each keypoint | |
| displacement = np.linalg.norm(curr_landmarks - prev_landmarks, axis=1) | |
| # Average displacement across all keypoints | |
| avg_displacement = np.mean(displacement) | |
| # Time difference (assuming constant fps) | |
| time_diff = keypoints_sequence[i].timestamp - keypoints_sequence[i-1].timestamp | |
| # Velocity = displacement / time | |
| velocity = safe_divide(avg_displacement, time_diff, 0.0) | |
| velocities.append(velocity) | |
| return np.array(velocities) | |
| def _classify_movement(self, keypoints_sequence: List[PoseKeypoints], | |
| avg_velocity: float) -> MovementType: | |
| """ | |
| Classify movement type based on velocity and pose characteristics | |
| Args: | |
| keypoints_sequence: List of pose keypoints | |
| avg_velocity: Average velocity across sequence | |
| Returns: | |
| MovementType classification | |
| """ | |
| # Check for jumping (vertical movement of center of mass) | |
| if self._detect_jumping(keypoints_sequence): | |
| return MovementType.JUMPING | |
| # Check for crouching (low body position) | |
| if self._detect_crouching(keypoints_sequence): | |
| return MovementType.CROUCHING | |
| # Classify based on velocity thresholds | |
| if avg_velocity < Config.VELOCITY_STANDING: | |
| return MovementType.STANDING | |
| elif avg_velocity < Config.VELOCITY_WALKING: | |
| return MovementType.WALKING | |
| elif avg_velocity < Config.VELOCITY_DANCING: | |
| return MovementType.DANCING | |
| else: | |
| # High velocity movements are likely dancing | |
| return MovementType.DANCING | |
| def _detect_jumping(self, keypoints_sequence: List[PoseKeypoints]) -> bool: | |
| """ | |
| Detect jumping motion by analyzing vertical hip movement | |
| Args: | |
| keypoints_sequence: List of pose keypoints | |
| Returns: | |
| True if jumping detected | |
| """ | |
| if len(keypoints_sequence) < 5: | |
| return False | |
| # Get hip positions (landmarks 23 and 24) | |
| hip_y_positions = [] | |
| for kp in keypoints_sequence: | |
| left_hip_y = kp.landmarks[23, 1] | |
| right_hip_y = kp.landmarks[24, 1] | |
| avg_hip_y = (left_hip_y + right_hip_y) / 2 | |
| hip_y_positions.append(avg_hip_y) | |
| hip_y_positions = np.array(hip_y_positions) | |
| # Calculate vertical velocity | |
| vertical_velocity = np.abs(np.diff(hip_y_positions)) | |
| # Jumping has high vertical velocity peaks | |
| max_vertical_velocity = np.max(vertical_velocity) | |
| return max_vertical_velocity > Config.VELOCITY_JUMPING | |
| def _detect_crouching(self, keypoints_sequence: List[PoseKeypoints]) -> bool: | |
| """ | |
| Detect crouching by analyzing hip-to-shoulder distance | |
| Args: | |
| keypoints_sequence: List of pose keypoints | |
| Returns: | |
| True if crouching detected | |
| """ | |
| if not keypoints_sequence: | |
| return False | |
| # Use middle frame for analysis | |
| mid_idx = len(keypoints_sequence) // 2 | |
| landmarks = keypoints_sequence[mid_idx].landmarks | |
| # Calculate average shoulder position (landmarks 11, 12) | |
| shoulder_y = (landmarks[11, 1] + landmarks[12, 1]) / 2 | |
| # Calculate average hip position (landmarks 23, 24) | |
| hip_y = (landmarks[23, 1] + landmarks[24, 1]) / 2 | |
| # Calculate torso length | |
| torso_length = abs(hip_y - shoulder_y) | |
| # Crouching: torso is compressed (small torso length) | |
| # This is relative, so we use a threshold | |
| return torso_length < 0.15 # Normalized coordinates | |
| def _calculate_intensity(self, velocities: np.ndarray, | |
| movement_type: MovementType) -> float: | |
| """ | |
| Calculate movement intensity on 0-100 scale | |
| Args: | |
| velocities: Array of velocities | |
| movement_type: Classified movement type | |
| Returns: | |
| Intensity score (0-100) | |
| """ | |
| if len(velocities) == 0: | |
| return 0.0 | |
| # Calculate base intensity from velocity | |
| avg_velocity = np.mean(velocities) | |
| velocity_std = np.std(velocities) | |
| # Normalize velocity to 0-100 scale | |
| # Higher velocity and variation = higher intensity | |
| base_intensity = min(avg_velocity * 500, 70) # Cap at 70 | |
| variation_bonus = min(velocity_std * 300, 30) # Up to 30 bonus | |
| raw_intensity = base_intensity + variation_bonus | |
| # Apply movement type multipliers | |
| multipliers = { | |
| MovementType.STANDING: 0.1, | |
| MovementType.WALKING: 0.4, | |
| MovementType.DANCING: 1.0, | |
| MovementType.JUMPING: 1.2, | |
| MovementType.CROUCHING: 0.3, | |
| MovementType.UNKNOWN: 0.5 | |
| } | |
| intensity = raw_intensity * multipliers.get(movement_type, 1.0) | |
| # Clamp to 0-100 range | |
| return np.clip(intensity, 0, 100) | |
| def _calculate_body_part_activity(self, | |
| keypoints_sequence: List[PoseKeypoints]) -> Dict[str, float]: | |
| """ | |
| Calculate activity level for each body part | |
| Args: | |
| keypoints_sequence: List of pose keypoints | |
| Returns: | |
| Dictionary mapping body part names to activity scores (0-100) | |
| """ | |
| if len(keypoints_sequence) < 2: | |
| return {part: 0.0 for part in self.BODY_PARTS.keys()} | |
| activity_scores = {} | |
| for part_name, landmark_indices in self.BODY_PARTS.items(): | |
| total_movement = 0.0 | |
| # Calculate movement for this body part across all frames | |
| for i in range(1, len(keypoints_sequence)): | |
| prev_landmarks = keypoints_sequence[i-1].landmarks[landmark_indices, :2] | |
| curr_landmarks = keypoints_sequence[i].landmarks[landmark_indices, :2] | |
| # Calculate average movement for this body part | |
| displacement = np.linalg.norm(curr_landmarks - prev_landmarks, axis=1) | |
| avg_displacement = np.mean(displacement) | |
| total_movement += avg_displacement | |
| # Normalize to 0-100 scale | |
| avg_movement = total_movement / (len(keypoints_sequence) - 1) | |
| activity_score = min(avg_movement * 1000, 100) # Scale and cap at 100 | |
| activity_scores[part_name] = activity_score | |
| return activity_scores | |
| def get_movement_summary(self) -> Dict[str, any]: | |
| """ | |
| Get summary statistics of all analyzed movements | |
| Returns: | |
| Dictionary with summary statistics | |
| """ | |
| if not self.movement_history: | |
| return { | |
| "total_sequences": 0, | |
| "average_intensity": 0.0, | |
| "movement_distribution": {}, | |
| "most_active_body_part": "none" | |
| } | |
| # Count movement types | |
| movement_counts = {} | |
| for metrics in self.movement_history: | |
| movement_type = metrics.movement_type.value | |
| movement_counts[movement_type] = movement_counts.get(movement_type, 0) + 1 | |
| # Calculate average intensity | |
| avg_intensity = np.mean([m.intensity for m in self.movement_history]) | |
| # Find most active body part across all sequences | |
| all_body_parts = {} | |
| for metrics in self.movement_history: | |
| for part, activity in metrics.body_part_activity.items(): | |
| if part not in all_body_parts: | |
| all_body_parts[part] = [] | |
| all_body_parts[part].append(activity) | |
| avg_body_part_activity = { | |
| part: np.mean(activities) | |
| for part, activities in all_body_parts.items() | |
| } | |
| most_active_part = max(avg_body_part_activity.items(), key=lambda x: x[1])[0] | |
| return { | |
| "total_sequences": len(self.movement_history), | |
| "average_intensity": round(avg_intensity, 2), | |
| "movement_distribution": movement_counts, | |
| "most_active_body_part": most_active_part, | |
| "avg_body_part_activity": { | |
| k: round(v, 2) for k, v in avg_body_part_activity.items() | |
| } | |
| } | |
| def detect_rhythm_patterns(self, keypoints_sequence: List[PoseKeypoints], | |
| fps: float) -> Dict[str, any]: | |
| """ | |
| Detect rhythmic patterns in movement (basic beat detection) | |
| Args: | |
| keypoints_sequence: List of pose keypoints | |
| fps: Video frames per second | |
| Returns: | |
| Dictionary with rhythm analysis | |
| """ | |
| if len(keypoints_sequence) < 10: | |
| return {"has_rhythm": False, "estimated_bpm": 0} | |
| # Calculate velocities | |
| velocities = self._calculate_velocities(keypoints_sequence) | |
| # Apply smoothing | |
| if len(velocities) > self.smoothing_window: | |
| kernel = np.ones(self.smoothing_window) / self.smoothing_window | |
| smoothed_velocities = np.convolve(velocities, kernel, mode='valid') | |
| else: | |
| smoothed_velocities = velocities | |
| # Find peaks in velocity (potential beats) | |
| peaks = self._find_peaks(smoothed_velocities) | |
| if len(peaks) < 2: | |
| return {"has_rhythm": False, "estimated_bpm": 0} | |
| # Calculate average time between peaks | |
| peak_intervals = np.diff(peaks) / fps # Convert to seconds | |
| avg_interval = np.mean(peak_intervals) | |
| # Calculate BPM (beats per minute) | |
| bpm = safe_divide(60, avg_interval, 0) | |
| # Check if rhythm is consistent (low standard deviation) | |
| interval_std = np.std(peak_intervals) | |
| is_rhythmic = interval_std < (avg_interval * 0.3) # Within 30% variation | |
| return { | |
| "has_rhythm": is_rhythmic, | |
| "estimated_bpm": round(bpm, 1), | |
| "peak_count": len(peaks), | |
| "rhythm_consistency": round(1 - (interval_std / avg_interval), 2) if avg_interval > 0 else 0 | |
| } | |
| def _find_peaks(self, signal: np.ndarray, threshold_percentile: float = 70) -> np.ndarray: | |
| """ | |
| Find peaks in a signal (simple peak detection) | |
| Args: | |
| signal: 1D signal array | |
| threshold_percentile: Percentile threshold for peak detection | |
| Returns: | |
| Array of peak indices | |
| """ | |
| if len(signal) < 3: | |
| return np.array([]) | |
| # Calculate threshold | |
| threshold = np.percentile(signal, threshold_percentile) | |
| peaks = [] | |
| for i in range(1, len(signal) - 1): | |
| # Peak: higher than neighbors and above threshold | |
| if (signal[i] > signal[i-1] and | |
| signal[i] > signal[i+1] and | |
| signal[i] > threshold): | |
| peaks.append(i) | |
| return np.array(peaks) | |
| def calculate_movement_smoothness(self, keypoints_sequence: List[PoseKeypoints]) -> float: | |
| """ | |
| Calculate smoothness of movement (lower jerk = smoother) | |
| Args: | |
| keypoints_sequence: List of pose keypoints | |
| Returns: | |
| Smoothness score (0-100, higher is smoother) | |
| """ | |
| if len(keypoints_sequence) < 3: | |
| return 100.0 # Not enough data | |
| # Calculate velocities | |
| velocities = self._calculate_velocities(keypoints_sequence) | |
| if len(velocities) < 2: | |
| return 100.0 | |
| # Calculate jerk (rate of change of velocity) | |
| jerk = np.abs(np.diff(velocities)) | |
| avg_jerk = np.mean(jerk) | |
| # Convert to smoothness score (inverse of jerk) | |
| # Lower jerk = higher smoothness | |
| smoothness = max(0, 100 - (avg_jerk * 1000)) | |
| return round(smoothness, 2) | |
| def _create_empty_metrics(self) -> MovementMetrics: | |
| """Create empty metrics for cases with no data""" | |
| return MovementMetrics( | |
| movement_type=MovementType.UNKNOWN, | |
| intensity=0.0, | |
| velocity=0.0, | |
| body_part_activity={part: 0.0 for part in self.BODY_PARTS.keys()}, | |
| frame_range=(0, 0) | |
| ) | |
| def reset(self): | |
| """Reset movement history""" | |
| self.movement_history.clear() | |
| logger.info("MovementClassifier reset") |